Skip to content

Commit

Permalink
Merge pull request #134 from bro-n-bro/131-sometime-crawler-fails-wit…
Browse files Browse the repository at this point in the history
…h-panic

131 panic on parsing addresses
  • Loading branch information
malekvictor authored Nov 6, 2023
2 parents 10c1eee + 1d74813 commit 4b8b273
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
5 changes: 5 additions & 0 deletions modules/core/account_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type MessageAddressesParser = func(cdc codec.Codec, msg sdk.Msg) []string
// JoinMessageParsers joins together all the given parsers, calling them in order
func JoinMessageParsers(parsers ...MessageAddressesParser) MessageAddressesParser {
return func(cdc codec.Codec, msg sdk.Msg) []string {
// https://github.com/bro-n-bro/spacebox-crawler/issues/131
if msg == nil {
return nil
}

for _, parser := range parsers {
// Try getting the addresses

Expand Down
3 changes: 2 additions & 1 deletion pkg/worker/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"time"

cometbftcoreypes "github.com/cometbft/cometbft/rpc/core/types"
Expand Down Expand Up @@ -318,7 +319,7 @@ func (w *Worker) processMessage(ctx context.Context, msg *codec.Any, tx *types.T
}

// message is not supported. skip it
if stdMsg == nil {
if stdMsg == nil || reflect.ValueOf(stdMsg).IsNil() {
return nil
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/worker/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ func (w *Worker) enqueueHeight(ctx context.Context, wg *sync.WaitGroup, startHei
case <-ctx.Done():
w.log.Info().Msg("stop enqueueHeight")
return
default:
case w.heightCh <- height: // put height to channel for processing the block
}
w.heightCh <- height // put height to channel for processing the block
}
}

Expand All @@ -50,9 +49,16 @@ func (w *Worker) enqueueNewBlocks(ctx context.Context, eventCh <-chan cometbftco
w.log.Warn().Msg("failed to cast ws event to EventDataNewBlock type")
continue
}

height := newBlock.Block.Header.Height
w.log.Info().Int64("height", height).Msg("enqueueing new block")
w.heightCh <- height

select {
case <-ctx.Done():
w.log.Info().Msg("stop new block listener")
return
case w.heightCh <- height:
}
}
}
}
Expand Down Expand Up @@ -84,9 +90,8 @@ func (w *Worker) enqueueErrorBlocks(ctx context.Context, wg *sync.WaitGroup) {
case <-ctx.Done():
w.log.Info().Msg("stop GetErrorBlockHeights")
return
default:
case w.heightCh <- height:
}
w.heightCh <- height
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -187,10 +186,6 @@ func (w *Worker) Stop(_ context.Context) error {
w.stopWsListener()
}

t := time.NewTicker(2 * time.Second)
defer t.Stop()
<-t.C // XXX save from send to closed channel

close(w.heightCh)
w.wg.Wait()
w.stopProcessing()
Expand Down

0 comments on commit 4b8b273

Please sign in to comment.