Skip to content

Commit

Permalink
feat(dot/sync): improve worker pool
Browse files Browse the repository at this point in the history
The main difference in the worker pool API is that SubmitBatch() does
not block until the whole batch has been processed. Instead, it returns
an ID which can be used to retrieve the current state of the batch.
In addition, Results() returns a channel over which task results are
sent as they become available.

The main improvement this brings is increased concurrency, since results
can be processed before the whole batch has been completed.

What has not changed is the overall flow of the Strategy interface;
getting a new batch of tasks with NextActions() and processing the
results with Process().

Closes #4232
  • Loading branch information
haikoschol committed Oct 14, 2024
1 parent f55ee02 commit 482000e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 424 deletions.
219 changes: 4 additions & 215 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (f *FullSyncStrategy) NextActions() ([]Task, error) {
func (f *FullSyncStrategy) createTasks(requests []*messages.BlockRequestMessage) []Task {
tasks := make([]Task, 0, len(requests))
for _, req := range requests {
tasks = append(tasks, &SyncTask{
tasks = append(tasks, &syncTask{
request: req,
requestMaker: f.reqMaker,
})
Expand All @@ -152,10 +152,10 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) (
if err != nil {
return false, nil, nil, fmt.Errorf("getting highest finalized header")
}
readyBlocks := make([][]*types.BlockData, 0)

// This is safe since we are the only goroutine reading from the channel.
// This is safe as long as we are the only goroutine reading from the channel.
for len(results) > 0 {
readyBlocks := make([][]*types.BlockData, 0)
result := <-results
repChange, ignorePeer, validResp := validateResult(result, f.badBlocks)

Expand Down Expand Up @@ -280,128 +280,6 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) (
return false, reputations, bans, nil
}

//func (f *FullSyncStrategy) Process(results []*SyncTaskResult) (
// isFinished bool, reputations []Change, bans []peer.ID, err error) {
// repChanges, peersToIgnore, validResp := validateResults(results, f.badBlocks)
// logger.Debugf("evaluating %d task results, %d valid responses", len(results), len(validResp))
//
// var highestFinalized *types.Header
// highestFinalized, err = f.blockState.GetHighestFinalisedHeader()
// if err != nil {
// return false, nil, nil, fmt.Errorf("getting highest finalized header")
// }
//
// readyBlocks := make([][]*types.BlockData, 0, len(validResp))
// for _, reqRespData := range validResp {
// // if Gossamer requested the header, then the response data should contains
// // the full blocks to be imported. If Gossamer didn't request the header,
// // then the response should only contain the missing parts that will complete
// // the unreadyBlocks and then with the blocks completed we should be able to import them
// if reqRespData.req.RequestField(messages.RequestedDataHeader) {
// updatedFragment, ok := f.unreadyBlocks.updateDisjointFragments(reqRespData.responseData)
// if ok {
// validBlocks := validBlocksUnderFragment(highestFinalized.Number, updatedFragment)
// if len(validBlocks) > 0 {
// readyBlocks = append(readyBlocks, validBlocks)
// }
// } else {
// readyBlocks = append(readyBlocks, reqRespData.responseData)
// }
//
// continue
// }
//
// completedBlocks := f.unreadyBlocks.updateIncompleteBlocks(reqRespData.responseData)
// readyBlocks = append(readyBlocks, completedBlocks)
// }
//
// // disjoint fragments are pieces of the chain that could not be imported right now
// // because is blocks too far ahead or blocks that belongs to forks
// sortFragmentsOfChain(readyBlocks)
// orderedFragments := mergeFragmentsOfChain(readyBlocks)
//
// nextBlocksToImport := make([]*types.BlockData, 0)
// disjointFragments := make([][]*types.BlockData, 0)
//
// for _, fragment := range orderedFragments {
// ok, err := f.blockState.HasHeader(fragment[0].Header.ParentHash)
// if err != nil && !errors.Is(err, database.ErrNotFound) {
// return false, nil, nil, fmt.Errorf("checking block parent header: %w", err)
// }
//
// if ok {
// nextBlocksToImport = append(nextBlocksToImport, fragment...)
// continue
// }
//
// disjointFragments = append(disjointFragments, fragment)
// }
//
// // this loop goal is to import ready blocks as well as update the highestFinalized header
// for len(nextBlocksToImport) > 0 || len(disjointFragments) > 0 {
// for _, blockToImport := range nextBlocksToImport {
// imported, err := f.blockImporter.importBlock(blockToImport, networkInitialSync)
// if err != nil {
// return false, nil, nil, fmt.Errorf("while handling ready block: %w", err)
// }
//
// if imported {
// f.syncedBlocks += 1
// }
// }
//
// nextBlocksToImport = make([]*types.BlockData, 0)
// highestFinalized, err = f.blockState.GetHighestFinalisedHeader()
// if err != nil {
// return false, nil, nil, fmt.Errorf("getting highest finalized header")
// }
//
// // check if blocks from the disjoint set can be imported on their on forks
// // given that fragment contains chains and these chains contains blocks
// // check if the first block in the chain contains a parent known by us
// for _, fragment := range disjointFragments {
// validFragment := validBlocksUnderFragment(highestFinalized.Number, fragment)
// if len(validFragment) == 0 {
// continue
// }
//
// ok, err := f.blockState.HasHeader(validFragment[0].Header.ParentHash)
// if err != nil && !errors.Is(err, database.ErrNotFound) {
// return false, nil, nil, err
// }
//
// if !ok {
// // if the parent of this valid fragment is behind our latest finalized number
// // then we can discard the whole fragment since it is a invalid fork
// if (validFragment[0].Header.Number - 1) <= highestFinalized.Number {
// continue
// }
//
// logger.Infof("starting an acestor search from %s parent of #%d (%s)",
// validFragment[0].Header.ParentHash,
// validFragment[0].Header.Number,
// validFragment[0].Header.Hash(),
// )
//
// f.unreadyBlocks.newDisjointFragment(validFragment)
// request := messages.NewBlockRequest(
// *messages.NewFromBlock(validFragment[0].Header.ParentHash),
// messages.MaxBlocksInResponse,
// messages.BootstrapRequestData, messages.Descending)
// f.requestQueue.PushBack(request)
// } else {
// // inserting them in the queue to be processed after the main chain
// nextBlocksToImport = append(nextBlocksToImport, validFragment...)
// }
// }
//
// disjointFragments = nil
// }
//
// f.unreadyBlocks.removeIrrelevantFragments(highestFinalized.Number)
// return false, repChanges, peersToIgnore, nil
//}

func (f *FullSyncStrategy) ShowMetrics() {
totalSyncAndImportSeconds := time.Since(f.startedAt).Seconds()
bps := float64(f.syncedBlocks) / totalSyncAndImportSeconds
Expand Down Expand Up @@ -536,7 +414,7 @@ func validateResult(result TaskResult, badBlocks []string) (repChange *Change,
return
}

task, ok := result.Task.(*SyncTask)
task, ok := result.Task.(*syncTask)
if !ok {
logger.Warnf("skipping unexpected task type in TaskResult: %T", result.Task)
return
Expand Down Expand Up @@ -604,95 +482,6 @@ func validateResult(result TaskResult, badBlocks []string) (repChange *Change,
return
}

// func validateResults(results []TaskResult, badBlocks []string) (repChanges []Change,
//
// peersToBlock []peer.ID, validRes []RequestResponseData) {
//
// repChanges = make([]Change, 0)
// peersToBlock = make([]peer.ID, 0)
// validRes = make([]RequestResponseData, 0, len(results))
//
// resultLoop:
//
// for _, result := range results {
// task, ok := result.Task.(*SyncTask)
// if !ok {
// logger.Warnf("skipping unexpected task type in TaskResult: %T", result.Task)
// continue
// }
//
// request := task.request.(*messages.BlockRequestMessage)
//
// if !result.Completed {
// continue
// }
//
// response := result.Result.(*messages.BlockResponseMessage)
// if request.Direction == messages.Descending {
// // reverse blocks before pre-validating and placing in ready queue
// slices.Reverse(response.BlockData)
// }
//
// err := validateResponseFields(request, response.BlockData)
// if err != nil {
// logger.Warnf("validating fields: %s", err)
// // TODO: check the reputation change for nil body in response
// // and nil justification in response
// if errors.Is(err, errNilHeaderInResponse) {
// repChanges = append(repChanges, Change{
// who: result.Who,
// rep: peerset.ReputationChange{
// Value: peerset.IncompleteHeaderValue,
// Reason: peerset.IncompleteHeaderReason,
// },
// })
// }
//
// continue
// }
//
// // only check if the responses forms a chain if the response contains the headers
// // of each block, othewise the response might only have the body/justification for
// // a block
// if request.RequestField(messages.RequestedDataHeader) && !isResponseAChain(response.BlockData) {
// logger.Warnf("response from %s is not a chain", result.Who)
// repChanges = append(repChanges, Change{
// who: result.Who,
// rep: peerset.ReputationChange{
// Value: peerset.IncompleteHeaderValue,
// Reason: peerset.IncompleteHeaderReason,
// },
// })
// continue
// }
//
// for _, block := range response.BlockData {
// if slices.Contains(badBlocks, block.Hash.String()) {
// logger.Warnf("%s sent a known bad block: #%d (%s)",
// result.Who, block.Number(), block.Hash.String())
//
// peersToBlock = append(peersToBlock, result.Who)
// repChanges = append(repChanges, Change{
// who: result.Who,
// rep: peerset.ReputationChange{
// Value: peerset.BadBlockAnnouncementValue,
// Reason: peerset.BadBlockAnnouncementReason,
// },
// })
//
// continue resultLoop
// }
// }
//
// validRes = append(validRes, RequestResponseData{
// req: request,
// responseData: response.BlockData,
// })
// }
//
// return repChanges, peersToBlock, validRes
// }
//
// sortFragmentsOfChain will organise the fragments
// in a way we can import the older blocks first also guaranting that
// forks can be imported by organising them to be after the main chain
Expand Down
19 changes: 9 additions & 10 deletions dot/sync/fullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func TestFullSyncNextActions(t *testing.T) {
task, err := fs.NextActions()
require.NoError(t, err)

require.Len(t, task, int(maxRequestsAllowed))
request := task[0].(*SyncTask).request.(*messages.BlockRequestMessage)
request := task[0].(*syncTask).request.(*messages.BlockRequestMessage)
require.Equal(t, uint(1), request.StartingBlock.RawValue())
require.Equal(t, uint32(128), *request.Max)
})
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestFullSyncNextActions(t *testing.T) {
task, err := fs.NextActions()
require.NoError(t, err)

require.Equal(t, task[0].(*SyncTask).request, tt.expectedTasks[0])
require.Equal(t, task[0].(*syncTask).request, tt.expectedTasks[0])
require.Equal(t, fs.requestQueue.Len(), tt.expectedQueueLen)
})
}
Expand Down Expand Up @@ -199,7 +198,7 @@ func TestFullSyncProcess(t *testing.T) {
// 1 -> 10
{
Who: peer.ID("peerA"),
Task: &SyncTask{
Task: &syncTask{
request: messages.NewBlockRequest(*messages.NewFromBlock(uint(1)), 127,
messages.BootstrapRequestData, messages.Ascending),
requestMaker: requestMaker,
Expand All @@ -212,7 +211,7 @@ func TestFullSyncProcess(t *testing.T) {
// 129 -> 256
{
Who: peer.ID("peerA"),
Task: &SyncTask{
Task: &syncTask{
request: messages.NewBlockRequest(*messages.NewFromBlock(uint(129)), 127,
messages.BootstrapRequestData, messages.Ascending),
requestMaker: requestMaker,
Expand All @@ -229,17 +228,17 @@ func TestFullSyncProcess(t *testing.T) {

mockBlockState.EXPECT().GetHighestFinalisedHeader().
Return(genesisHeader, nil).
Times(2)
Times(5)

mockBlockState.EXPECT().
HasHeader(fstTaskBlockResponse.BlockData[0].Header.ParentHash).
Return(true, nil).
Times(1)
Times(2)

mockBlockState.EXPECT().
HasHeader(sndTaskBlockResponse.BlockData[0].Header.ParentHash).
Return(false, nil).
Times(1)
Times(2)

mockImporter := NewMockimporter(ctrl)
mockImporter.EXPECT().
Expand Down Expand Up @@ -287,7 +286,7 @@ func TestFullSyncProcess(t *testing.T) {
// ancestor search task
// 128 -> 1
Who: peer.ID("peerA"),
Task: &SyncTask{
Task: &syncTask{
request: expectedAncestorRequest,
requestMaker: requestMaker,
},
Expand Down Expand Up @@ -470,7 +469,7 @@ func TestFullSyncBlockAnnounce(t *testing.T) {

requests := make([]messages.P2PMessage, len(tasks))
for idx, task := range tasks {
requests[idx] = task.(*SyncTask).request
requests[idx] = task.(*syncTask).request
}

block17 := types.NewHeader(announceOfBlock17.ParentHash,
Expand Down
Loading

0 comments on commit 482000e

Please sign in to comment.