Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
haikoschol committed Nov 6, 2024
1 parent 78c0484 commit b64d894
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 46 deletions.
7 changes: 5 additions & 2 deletions dot/sync/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"
)

const defaultNoPeersRetryDelay = time.Second * 10

type ServiceConfig func(svc *SyncService)

func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
Expand All @@ -21,8 +23,9 @@ func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
wpCapacity *= 2 // add some buffer

svc.workerPool = NewWorkerPool(WorkerPoolConfig{
Capacity: wpCapacity,
MaxRetries: UnlimitedRetries,
Capacity: wpCapacity,
MaxRetries: UnlimitedRetries,
NoPeersRetryDelay: defaultNoPeersRetryDelay,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func (f *FullSyncStrategy) Process(results <-chan TaskResult) (
// This is safe as long as we are the only goroutine reading from the channel.
for len(results) > 0 {
readyBlocks := make([][]*types.BlockData, 0)
logger.Info("FullSyncStrategy.Process(): consuming from results channel...") // TODO: remove
result := <-results
logger.Info("FullSyncStrategy.Process(): consumed from results channel") // TODO: remove
repChange, ignorePeer, validResp := validateResult(result, f.badBlocks)

if repChange != nil {
Expand Down
4 changes: 2 additions & 2 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
const (
waitPeersDefaultTimeout = 10 * time.Second
minPeersDefault = 1
maxTaskRetries = 5
)

var (
Expand Down Expand Up @@ -175,7 +174,6 @@ func (s *SyncService) Stop() error {

func (s *SyncService) HandleBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error {
logger.Infof("receiving a block announce handshake from %s", from.String())
logger.Infof("len(s.workerPool.Results())=%d", len(s.workerPool.Results())) // TODO: remove
if err := s.workerPool.AddPeer(from); err != nil {
logger.Warnf("failed to add peer to worker pool: %s", err)
return err
Expand Down Expand Up @@ -288,6 +286,8 @@ func (s *SyncService) runStrategy() {
}

_ = s.workerPool.SubmitBatch(tasks)
} else {
logger.Info("SyncService.runStrategy(): worker pool is at capacity, not asking for more tasks") // TODO: remove
}

done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results())
Expand Down
92 changes: 56 additions & 36 deletions dot/sync/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/network"

"github.com/libp2p/go-libp2p/core/peer"
)

Expand Down Expand Up @@ -87,8 +84,9 @@ type WorkerPool interface {
}

type WorkerPoolConfig struct {
Capacity int
MaxRetries int
Capacity int
MaxRetries int
NoPeersRetryDelay time.Duration
}

// NewWorkerPool creates a new worker pool with the given configuration.
Expand All @@ -101,6 +99,7 @@ func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool {

return &workerPool{
maxRetries: cfg.MaxRetries,
retryDelay: cfg.NoPeersRetryDelay,
ignoredPeers: make(map[peer.ID]struct{}),
statuses: make(map[BatchID]BatchStatus),
resChan: make(chan TaskResult, cfg.Capacity),
Expand All @@ -114,6 +113,7 @@ type workerPool struct {
wg sync.WaitGroup

maxRetries int
retryDelay time.Duration
peers list.List
ignoredPeers map[peer.ID]struct{}
statuses map[BatchID]BatchStatus
Expand Down Expand Up @@ -170,19 +170,7 @@ func (w *workerPool) AddPeer(who peer.ID) error {
w.mtx.Lock()
defer w.mtx.Unlock()

if _, ok := w.ignoredPeers[who]; ok {
return ErrPeerIgnored
}

for e := w.peers.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == who {
return nil
}
}

w.peers.PushBack(who)
logger.Tracef("peer added, total in the pool %d", w.peers.Len())
return nil
return w.addPeer(who)
}

// RemovePeer removes a peer from the worker pool.
Expand Down Expand Up @@ -213,6 +201,7 @@ func (w *workerPool) NumPeers() int {
// Shutdown stops the worker pool and waits for all tasks to complete.
func (w *workerPool) Shutdown() {
w.cancel()
close(w.resChan)
w.wg.Wait()
}

Expand All @@ -230,6 +219,7 @@ func (w *workerPool) executeBatch(tasks []Task, bID BatchID) {
for {
select {
case <-w.ctx.Done():
close(batchResults)
return

case tr := <-batchResults:
Expand All @@ -248,13 +238,13 @@ func (w *workerPool) executeBatch(tasks []Task, bID BatchID) {

func (w *workerPool) executeTask(task Task, ch chan TaskResult) {
if errors.Is(w.ctx.Err(), context.Canceled) {
logger.Tracef("[CANCELED] task=%s, shutting down", task.String())
logger.Infof("[CANCELED] task=%s, shutting down", task.String()) // TODO: change to debug lvl
return
}

who, err := w.reservePeer()
if errors.Is(err, ErrNoPeers) {
logger.Tracef("no peers available for task=%s", task.String())
logger.Infof("no peers available for task=%s", task.String()) // TODO: change to trace lvl
ch <- TaskResult{Task: task, Error: ErrNoPeers}
return
}
Expand All @@ -263,15 +253,11 @@ func (w *workerPool) executeTask(task Task, ch chan TaskResult) {

result, err := task.Do(who)
if err != nil {
logger.Tracef("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error())
logger.Debugf("[FAILED] task=%s peer=%s, err=%s", task.String(), who, err.Error())
} else {
logger.Tracef("[FINISHED] task=%s peer=%s", task.String(), who)
logger.Debugf("[FINISHED] task=%s peer=%s", task.String(), who)
}

w.mtx.Lock()
w.peers.PushBack(who)
w.mtx.Unlock()

ch <- TaskResult{
Task: task,
Who: who,
Expand All @@ -295,6 +281,22 @@ func (w *workerPool) reservePeer() (who peer.ID, err error) {
return peerElement.Value.(peer.ID), nil
}

func (w *workerPool) addPeer(who peer.ID) error {
if _, ok := w.ignoredPeers[who]; ok {
return ErrPeerIgnored
}

for e := w.peers.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == who {
return nil
}
}

w.peers.PushBack(who)
logger.Tracef("peer added, total in the pool %d", w.peers.Len())
return nil
}

func (w *workerPool) removePeer(who peer.ID) {
var toRemove *list.Element
for e := w.peers.Front(); e != nil; e = e.Next() {
Expand Down Expand Up @@ -322,33 +324,41 @@ func (w *workerPool) handleSuccessfulTask(tr TaskResult, batchID BatchID) {

tr.Completed = true
w.statuses[batchID].Success[tID] = tr
logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove
_ = w.addPeer(tr.Who)

logger.Infof("handleSuccessfulTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl
w.resChan <- tr
}

func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResults chan TaskResult) {
w.mtx.Lock()
defer w.mtx.Unlock()

if errors.Is(w.ctx.Err(), context.Canceled) {
logger.Infof("handleFailedTask(): worker pool canceled, not handling failed task") // TODO: remove
return
}

logger.Infof("handling failed task err: %v task: %s", tr.Error, tr.Task.ID()) // TODO: remove
delayRetry := false
tID := tr.Task.ID()

if oldTr, ok := w.statuses[batchID].Failed[tID]; ok {
// It is only considered a retry if the task was actually executed.
if !errors.Is(oldTr.Error, ErrNoPeers) {
if errors.Is(oldTr.Error, io.EOF) || errors.Is(oldTr.Error, network.ErrStreamReset) {
w.removePeer(oldTr.Who)
logger.Debugf("removed peer %s from the worker pool", oldTr.Who)
}
// It is only considered a retry if the task was actually executed.
if errors.Is(tr.Error, ErrNoPeers) {
delayRetry = true
} else {
w.ignoredPeers[tr.Who] = struct{}{}

if oldTr, ok := w.statuses[batchID].Failed[tID]; ok {
tr.Retries = oldTr.Retries + 1
tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries
}
}

tr.Completed = w.maxRetries != UnlimitedRetries && tr.Retries >= w.maxRetries
w.statuses[batchID].Failed[tID] = tr

if tr.Completed {
logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: remove
logger.Infof("handleFailedTask(): len(w.resChan)=%d", len(w.resChan)) // TODO: change to trace lvl
w.resChan <- tr
return
}
Expand All @@ -357,6 +367,16 @@ func (w *workerPool) handleFailedTask(tr TaskResult, batchID BatchID, batchResul
w.wg.Add(1)
go func() {
defer w.wg.Done()
if delayRetry {
logger.Infof("delaying retry of task %s", tr.Task.String()) // TODO: remove
timer := time.NewTimer(w.retryDelay)
select {
case <-timer.C:
case <-w.ctx.Done():
logger.Infof("in gorouting, worker pool canceled, not retrying task") // TODO: remove
return
}
}
w.executeTask(tr.Task, batchResults)
}()
}
Expand Down
24 changes: 18 additions & 6 deletions dot/sync/worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ func makeTasksAndPeers(num, idOffset int) ([]Task, []peer.ID) {
return tasks, peers
}

func makePool(maxRetries ...int) WorkerPool {
mr := 0
if len(maxRetries) > 0 {
mr = maxRetries[0]
}

return NewWorkerPool(WorkerPoolConfig{
MaxRetries: mr,
NoPeersRetryDelay: time.Millisecond * 10,
})
}

func waitForCompletion(wp WorkerPool, numTasks int) {
resultsReceived := 0

Expand All @@ -73,7 +85,7 @@ func TestWorkerPoolHappyPath(t *testing.T) {

var setup = func() (WorkerPool, []Task) {
tasks, peers := makeTasksAndPeers(numTasks, 0)
wp := NewWorkerPool(WorkerPoolConfig{})
wp := makePool()

for _, who := range peers {
err := wp.AddPeer(who)
Expand Down Expand Up @@ -120,7 +132,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) {

t.Run("accepts_batch_without_any_peers", func(t *testing.T) {
tasks, _ := makeTasksAndPeers(numTasks, 0)
wp := NewWorkerPool(WorkerPoolConfig{})
wp := makePool()

wp.SubmitBatch(tasks)

Expand All @@ -129,7 +141,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) {

t.Run("completes_batch_with_fewer_peers_than_tasks", func(t *testing.T) {
tasks, peers := makeTasksAndPeers(numTasks, 0)
wp := NewWorkerPool(WorkerPoolConfig{})
wp := makePool()
assert.NoError(t, wp.AddPeer(peers[0]))
assert.NoError(t, wp.AddPeer(peers[1]))

Expand All @@ -145,7 +157,7 @@ func TestWorkerPoolPeerHandling(t *testing.T) {

t.Run("refuses_to_re_add_ignored_peer", func(t *testing.T) {
_, peers := makeTasksAndPeers(numTasks, 0)
wp := NewWorkerPool(WorkerPoolConfig{})
wp := makePool()

for _, who := range peers {
err := wp.AddPeer(who)
Expand Down Expand Up @@ -178,7 +190,7 @@ func TestWorkerPoolTaskFailures(t *testing.T) {
failTwice.err = taskErr
failTwice.succeedAfter = 2

wp = NewWorkerPool(WorkerPoolConfig{MaxRetries: maxRetries})
wp = makePool(maxRetries)
for _, who := range peers {
err := wp.AddPeer(who)
assert.NoError(t, err)
Expand Down Expand Up @@ -232,7 +244,7 @@ func TestWorkerPoolMultipleBatches(t *testing.T) {
b2Tasks, b2Peers := makeTasksAndPeers(b2NumTasks, b1NumTasks)
peers := append(b1Peers, b2Peers...)

wp := NewWorkerPool(WorkerPoolConfig{})
wp := makePool()
for _, who := range peers {
err := wp.AddPeer(who)
assert.NoError(t, err)
Expand Down

0 comments on commit b64d894

Please sign in to comment.