Skip to content

Commit

Permalink
chore: introduce concurrency and perpetual tip sync chase
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Aug 30, 2024
1 parent 43eef82 commit c22065f
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 123 deletions.
5 changes: 5 additions & 0 deletions dot/peerset/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const (
// GoodTransactionReason is the reason for used for good transaction.
GoodTransactionReason = "Good Transaction"

// NotRelevantBlockAnnounce when peer sends us a not relevant block
NotRelevantBlockAnnounceValue Reputation = -(1 << 2)
// BadTransactionReason when transaction import was not performed.
NotRelevantBlockAnnounceReason = "Not Relevant Block Announce"

// BadTransactionValue used when transaction import was not performed.
BadTransactionValue Reputation = -(1 << 12)
// BadTransactionReason when transaction import was not performed.
Expand Down
154 changes: 91 additions & 63 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/database"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/variadic"

"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -28,11 +27,10 @@ const defaultNumOfTasks = 3
var _ Strategy = (*FullSyncStrategy)(nil)

var (
errFailedToGetParent = errors.New("failed to get parent header")
errNilHeaderInResponse = errors.New("expected header, received none")
errNilBodyInResponse = errors.New("expected body, received none")
errPeerOnInvalidFork = errors.New("peer is on an invalid fork")
errMismatchBestBlockAnnouncement = errors.New("mismatch best block announcement")
errFailedToGetParent = errors.New("failed to get parent header")
errNilHeaderInResponse = errors.New("expected header, received none")
errNilBodyInResponse = errors.New("expected body, received none")
errPeerOnInvalidFork = errors.New("peer is on an invalid fork")

blockSizeGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "gossamer_sync",
Expand Down Expand Up @@ -81,16 +79,12 @@ func NewFullSyncStrategy(cfg *FullSyncConfig) *FullSyncStrategy {
}

return &FullSyncStrategy{
badBlocks: cfg.BadBlocks,
reqMaker: cfg.RequestMaker,
blockState: cfg.BlockState,
numOfTasks: cfg.NumOfTasks,
importer: newBlockImporter(cfg),
unreadyBlocks: &unreadyBlocks{
incompleteBlocks: make(map[common.Hash]*types.BlockData),
// TODO: cap disjoitChains to don't grows indefinitely
disjointChains: make([][]*types.BlockData, 0),
},
badBlocks: cfg.BadBlocks,
reqMaker: cfg.RequestMaker,
blockState: cfg.BlockState,
numOfTasks: cfg.NumOfTasks,
importer: newBlockImporter(cfg),
unreadyBlocks: newUnreadyBlocks(),
requestQueue: &requestsQueue[*messages.BlockRequestMessage]{
queue: list.New(),
},
Expand All @@ -105,9 +99,12 @@ func (f *FullSyncStrategy) NextActions() ([]*syncTask, error) {
f.startedAt = time.Now()
f.syncedBlocks = 0

if f.requestQueue.Len() > 0 {
message, _ := f.requestQueue.PopFront()
return f.createTasks([]*messages.BlockRequestMessage{message}), nil
messagesToSend := []*messages.BlockRequestMessage{}
for f.requestQueue.Len() > 0 {
msg, ok := f.requestQueue.PopFront()
if ok {
messagesToSend = append(messagesToSend, msg)
}
}

currentTarget := f.peers.getTarget()
Expand All @@ -117,13 +114,22 @@ func (f *FullSyncStrategy) NextActions() ([]*syncTask, error) {
}

// our best block is equal or ahead of current target.
// in the nodes pov we are not legging behind so there's nothing to do
// in the node's pov we are not legging behind so there's nothing to do
// or we didn't receive block announces, so lets ask for more blocks
if uint32(bestBlockHeader.Number) >= currentTarget {
return nil, nil
ascendingBlockRequests := messages.NewBlockRequest(
*variadic.Uint32OrHashFrom(bestBlockHeader.Hash()),
messages.MaxBlocksInResponse,
messages.BootstrapRequestData,
messages.Ascending,
)

messagesToSend = append(messagesToSend, ascendingBlockRequests)
return f.createTasks(messagesToSend), nil
}

startRequestAt := bestBlockHeader.Number + 1
targetBlockNumber := startRequestAt + 127
targetBlockNumber := startRequestAt + maxRequestsAllowed*127

if targetBlockNumber > uint(currentTarget) {
targetBlockNumber = uint(currentTarget)
Expand All @@ -150,7 +156,9 @@ func (f *FullSyncStrategy) createTasks(requests []*messages.BlockRequestMessage)

func (f *FullSyncStrategy) IsFinished(results []*syncTaskResult) (bool, []Change, []peer.ID, error) {
repChanges, peersToIgnore, validResp := validateResults(results, f.badBlocks)
logger.Debugf("evaluating %d task results, %d valid responses", len(results), len(validResp))

var highestFinalized *types.Header
highestFinalized, err := f.blockState.GetHighestFinalisedHeader()
if err != nil {
return false, nil, nil, fmt.Errorf("getting highest finalized header")
Expand Down Expand Up @@ -203,6 +211,10 @@ func (f *FullSyncStrategy) IsFinished(results []*syncTaskResult) (bool, []Change
disjointFragments = append(disjointFragments, fragment)
}

logger.Debugf("blocks to import: %d, disjoint fragments: %d", len(nextBlocksToImport), len(disjointFragments))

// this loop goal is to import ready blocks as well as
// update the highestFinalized header
for len(nextBlocksToImport) > 0 || len(disjointFragments) > 0 {
for _, blockToImport := range nextBlocksToImport {
imported, err := f.importer.handle(blockToImport, networkInitialSync)
Expand All @@ -216,16 +228,15 @@ func (f *FullSyncStrategy) IsFinished(results []*syncTaskResult) (bool, []Change
}

nextBlocksToImport = make([]*types.BlockData, 0)
highestFinalized, err = f.blockState.GetHighestFinalisedHeader()
if err != nil {
return false, nil, nil, fmt.Errorf("getting highest finalized header")
}

// check if blocks from the disjoint set can be imported on their on forks
// given that fragment contains chains and these chains contains blocks
// check if the first block in the chain contains a parent known by us
for _, fragment := range disjointFragments {
highestFinalized, err := f.blockState.GetHighestFinalisedHeader()
if err != nil {
return false, nil, nil, fmt.Errorf("getting highest finalized header")
}

validFragment := validBlocksUnderFragment(highestFinalized.Number, fragment)
if len(validFragment) == 0 {
continue
Expand All @@ -249,7 +260,7 @@ func (f *FullSyncStrategy) IsFinished(results []*syncTaskResult) (bool, []Change
validFragment[0].Header.Hash(),
)

f.unreadyBlocks.newFragment(validFragment)
f.unreadyBlocks.newDisjointFragemnt(validFragment)
request := messages.NewBlockRequest(
*variadic.Uint32OrHashFrom(validFragment[0].Header.ParentHash),
messages.MaxBlocksInResponse,
Expand All @@ -264,6 +275,7 @@ func (f *FullSyncStrategy) IsFinished(results []*syncTaskResult) (bool, []Change
disjointFragments = nil
}

f.unreadyBlocks.removeIrrelevantFragments(highestFinalized.Number)
return false, repChanges, peersToIgnore, nil
}

Expand All @@ -272,7 +284,7 @@ func (f *FullSyncStrategy) ShowMetrics() {
bps := float64(f.syncedBlocks) / totalSyncAndImportSeconds
logger.Infof("⛓️ synced %d blocks, disjoint fragments %d, incomplete blocks %d, "+
"took: %.2f seconds, bps: %.2f blocks/second, target block number #%d",
f.syncedBlocks, len(f.unreadyBlocks.disjointChains), len(f.unreadyBlocks.incompleteBlocks),
f.syncedBlocks, len(f.unreadyBlocks.disjointFragments), len(f.unreadyBlocks.incompleteBlocks),
totalSyncAndImportSeconds, bps, f.peers.getTarget())
}

Expand All @@ -282,75 +294,91 @@ func (f *FullSyncStrategy) OnBlockAnnounceHandshake(from peer.ID, msg *network.B
}

func (f *FullSyncStrategy) OnBlockAnnounce(from peer.ID, msg *network.BlockAnnounceMessage) (
repChange *Change, err error) {
gossip bool, repChange *Change, err error) {
if f.blockState.IsPaused() {
return nil, errors.New("blockstate service is paused")
}

currentTarget := f.peers.getTarget()
if msg.Number >= uint(currentTarget) {
return nil, nil
return false, nil, errors.New("blockstate service is paused")
}

blockAnnounceHeader := types.NewHeader(msg.ParentHash, msg.StateRoot, msg.ExtrinsicsRoot, msg.Number, msg.Digest)
blockAnnounceHeaderHash := blockAnnounceHeader.Hash()

if msg.BestBlock {
pv := f.peers.get(from)
if uint(pv.bestBlockNumber) != msg.Number || blockAnnounceHeaderHash != pv.bestBlockHash {
repChange = &Change{
who: from,
rep: peerset.ReputationChange{
Value: peerset.BadMessageValue,
Reason: peerset.BadMessageReason,
},
}
return repChange, fmt.Errorf("%w: peer %s, on handshake #%d (%s), on announce #%d (%s)",
errMismatchBestBlockAnnouncement, from,
pv.bestBlockNumber, pv.bestBlockHash.String(),
msg.Number, blockAnnounceHeaderHash.String())
}
}

logger.Infof("received block announce from %s: #%d (%s) best block: %v",
from,
blockAnnounceHeader.Number,
blockAnnounceHeaderHash,
msg.BestBlock,
)

// check if their best block is on an invalid chain, if it is,
// potentially downscore them for now, we can remove them from the syncing peers set
if slices.Contains(f.badBlocks, blockAnnounceHeaderHash.String()) {
return false, &Change{
who: from,
rep: peerset.ReputationChange{
Value: peerset.BadBlockAnnouncementValue,
Reason: peerset.BadBlockAnnouncementReason,
},
}, nil
}

highestFinalized, err := f.blockState.GetHighestFinalisedHeader()
if err != nil {
return nil, fmt.Errorf("get highest finalised header: %w", err)
return false, nil, fmt.Errorf("get highest finalised header: %w", err)
}

if blockAnnounceHeader.Number <= highestFinalized.Number {
// check if the announced block is relevant
if blockAnnounceHeader.Number <= highestFinalized.Number || f.blockAlreadyTracked(blockAnnounceHeader) {
logger.Debugf("announced block irrelevant #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
repChange = &Change{
who: from,
rep: peerset.ReputationChange{
Value: peerset.BadBlockAnnouncementValue,
Reason: peerset.BadBlockAnnouncementReason,
Value: peerset.NotRelevantBlockAnnounceValue,
Reason: peerset.NotRelevantBlockAnnounceReason,
},
}
return repChange, fmt.Errorf("%w: peer %s, block number #%d (%s)",

return false, repChange, fmt.Errorf("%w: peer %s, block number #%d (%s)",
errPeerOnInvalidFork, from, blockAnnounceHeader.Number, blockAnnounceHeaderHash.String())
}

logger.Debugf("relevant announced block #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
bestBlockHeader, err := f.blockState.BestBlockHeader()
if err != nil {
return false, nil, fmt.Errorf("get best block header: %w", err)
}

// if we still far from aproaching the calculated target
// then we can ignore the block announce
ratioOfCompleteness := (bestBlockHeader.Number / uint(f.peers.getTarget())) * 100
logger.Infof("ratio of completeness: %d", ratioOfCompleteness)
if ratioOfCompleteness < 80 {
return true, nil, nil
}

has, err := f.blockState.HasHeader(blockAnnounceHeaderHash)
if err != nil {
return nil, fmt.Errorf("checking if header exists: %w", err)
return false, nil, fmt.Errorf("checking if header exists: %w", err)
}

if !has {
f.unreadyBlocks.newHeader(blockAnnounceHeader)
f.unreadyBlocks.newIncompleteBlock(blockAnnounceHeader)
logger.Infof("requesting announced block body #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
request := messages.NewBlockRequest(*variadic.Uint32OrHashFrom(blockAnnounceHeaderHash),
1, messages.RequestedDataBody+messages.RequestedDataJustification, messages.Ascending)
f.requestQueue.PushBack(request)
}

return nil, nil
logger.Infof("block announced already exists #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
return true, &Change{
who: from,
rep: peerset.ReputationChange{
Value: peerset.NotRelevantBlockAnnounceValue,
Reason: peerset.NotRelevantBlockAnnounceReason,
},
}, nil
}

func (f *FullSyncStrategy) blockAlreadyTracked(announcedHeader *types.Header) bool {
return f.unreadyBlocks.isIncomplete(announcedHeader.Hash()) ||
f.unreadyBlocks.inDisjointFragment(announcedHeader.Hash(), announcedHeader.Number)
}

func (f *FullSyncStrategy) IsSynced() bool {
Expand Down
11 changes: 5 additions & 6 deletions dot/sync/fullsync_handle_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,9 @@ func (b *blockImporter) handle(bd *types.BlockData, origin BlockOrigin) (importe
// or the index of the block data that errored on failure.
// TODO: https://github.com/ChainSafe/gossamer/issues/3468
func (b *blockImporter) processBlockData(blockData types.BlockData, origin BlockOrigin) error {
announceImportedBlock := false

if blockData.Header != nil {
if blockData.Body != nil {
err := b.processBlockDataWithHeaderAndBody(blockData, origin, announceImportedBlock)
err := b.processBlockDataWithHeaderAndBody(blockData, origin)
if err != nil {
return fmt.Errorf("processing block data with header and body: %w", err)
}
Expand All @@ -123,7 +121,7 @@ func (b *blockImporter) processBlockData(blockData types.BlockData, origin Block
}

func (b *blockImporter) processBlockDataWithHeaderAndBody(blockData types.BlockData,
origin BlockOrigin, announceImportedBlock bool) (err error) {
origin BlockOrigin) (err error) {

if origin != networkInitialSync {
err = b.babeVerifier.VerifyBlock(blockData.Header)
Expand All @@ -145,7 +143,7 @@ func (b *blockImporter) processBlockDataWithHeaderAndBody(blockData types.BlockD
Body: *blockData.Body,
}

err = b.handleBlock(block, announceImportedBlock)
err = b.handleBlock(block)
if err != nil {
return fmt.Errorf("handling block: %w", err)
}
Expand All @@ -154,7 +152,7 @@ func (b *blockImporter) processBlockDataWithHeaderAndBody(blockData types.BlockD
}

// handleHeader handles blocks (header+body) included in BlockResponses
func (b *blockImporter) handleBlock(block *types.Block, announceImportedBlock bool) error {
func (b *blockImporter) handleBlock(block *types.Block) error {
parent, err := b.blockState.GetHeader(block.Header.ParentHash)
if err != nil {
return fmt.Errorf("%w: %s", errFailedToGetParent, err)
Expand Down Expand Up @@ -185,6 +183,7 @@ func (b *blockImporter) handleBlock(block *types.Block, announceImportedBlock bo
return fmt.Errorf("failed to execute block %d: %w", block.Header.Number, err)
}

announceImportedBlock := false
if err = b.blockImportHandler.HandleBlockImport(block, ts, announceImportedBlock); err != nil {
return err
}
Expand Down
Loading

0 comments on commit c22065f

Please sign in to comment.