Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Console Reader #6

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/firefil/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/chainsafe/firehose-filecoin/console_reader"
pbfilecoin "github.com/chainsafe/firehose-filecoin/pb/sf/filecoin/type/v1"
firecore "github.com/streamingfast/firehose-core"
fhCmd "github.com/streamingfast/firehose-core/cmd"
Expand All @@ -17,7 +18,7 @@ func main() {
FirstStreamableBlock: 1,

BlockFactory: func() firecore.Block { return new(pbfilecoin.Block) },
ConsoleReaderFactory: firecore.NewConsoleReader,
ConsoleReaderFactory: console_reader.NewConsoleReader,

Tools: &firecore.ToolsConfig[*pbfilecoin.Block]{},
})
Expand Down
300 changes: 300 additions & 0 deletions console_reader/console_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package console_reader

import (
"fmt"
"github.com/ipfs/go-cid"
firecore "github.com/streamingfast/firehose-core"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

const FirePrefix = "FIRE "
const FirePrefixLen = len(FirePrefix)
const InitLogPrefix = "INIT "
const InitLogPrefixLen = len(InitLogPrefix)
const BlockLogPrefix = "BLOCK "
const BlockLogPrefixLen = len(BlockLogPrefix)

type ParsingStats struct {
}

type ConsoleReader struct {
lines chan string
done chan interface{}
closeOnce sync.Once
logger *zap.Logger
tracer logging.Tracer

// Parsing context
readerProtocolVersion string
protoMessageType string
lastBlock bstream.BlockRef
lastParentBlock bstream.BlockRef
lastBlockTimestamp time.Time

lib uint64

blockRate *dmetrics.AvgRatePromCounter
}

func NewConsoleReader(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) {
reader := newConsoleReader(lines, logger, tracer)

delayBetweenStats := 30 * time.Second
if tracer.Enabled() {
delayBetweenStats = 5 * time.Second
}

go func() {
defer reader.blockRate.Stop()

for {
select {
case <-reader.done:
return
case <-time.After(delayBetweenStats):
reader.printStats()
}
}
}()

return reader, nil
}

func newConsoleReader(lines chan string, logger *zap.Logger, tracer logging.Tracer) *ConsoleReader {
return &ConsoleReader{
lines: lines,
done: make(chan interface{}),
logger: logger,
tracer: tracer,

blockRate: dmetrics.MustNewAvgRateFromPromCounter(ConsoleReaderBlockReadCount, 1*time.Second, 30*time.Second, "blocks"),
}
}

func (r *ConsoleReader) Done() <-chan interface{} {
return r.done
}

func (r *ConsoleReader) Close() error {
r.closeOnce.Do(func() {
r.blockRate.SyncNow()
r.printStats()

r.logger.Info("console reader done")
close(r.done)
})

return nil
}

type blockRefView struct {
ref bstream.BlockRef
}

func (v blockRefView) String() string {
if v.ref == nil {
return "<unset>"
}

return v.ref.String()
}

type blockRefViewTimestamp struct {
ref bstream.BlockRef
timestamp time.Time
}

func (v blockRefViewTimestamp) String() string {
return fmt.Sprintf("%s @ %s", blockRefView{v.ref}, v.timestamp.Local().Format(time.RFC822Z))
}

func (r *ConsoleReader) printStats() {
r.logger.Info("console reader stats",
zap.Stringer("block_rate", r.blockRate),
zap.Stringer("last_block", blockRefViewTimestamp{r.lastBlock, r.lastBlockTimestamp}),
zap.Stringer("last_parent_block", blockRefView{r.lastParentBlock}),
zap.Uint64("lib", r.lib),
)
}

func (r *ConsoleReader) ReadBlock() (out *pbbstream.Block, err error) {
out, err = r.next()
if err != nil {
return nil, err
}

return out, nil
}

func (r *ConsoleReader) next() (out *pbbstream.Block, err error) {
for line := range r.lines {
if !strings.HasPrefix(line, "FIRE ") {
continue
}

line = line[FirePrefixLen:]

switch {
case strings.HasPrefix(line, BlockLogPrefix):
out, err = r.readBlock(line[BlockLogPrefixLen:])

case strings.HasPrefix(line, InitLogPrefix):
err = r.readInit(line[InitLogPrefixLen:])
default:
if r.tracer.Enabled() {
r.logger.Debug("skipping unknown Firehose log line", zap.String("line", line))
}
continue
}

if err != nil {
chunks := strings.SplitN(line, " ", 2)
return nil, fmt.Errorf("%s: %s (line %q)", chunks[0], err, line)
}

if out != nil {
return out, nil
}
}

r.Close()

return nil, io.EOF
}

// Formats
// [READER_PROTOCOL_VERSION] sf.ethereum.type.v2.Block
func (r *ConsoleReader) readInit(line string) error {
chunks, err := splitInBoundedChunks(line, 2)
if err != nil {
return fmt.Errorf("split: %s", err)
}

r.readerProtocolVersion = chunks[0]

switch r.readerProtocolVersion {
// Implementation of RPC poller were set to use 1.0 so we keep support for it for now
case "1.0", "3.0":
// Supported
default:
return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion)
}

protobufFullyQualifiedName := chunks[1]
if protobufFullyQualifiedName == "" {
return fmt.Errorf("protobuf fully qualified name is empty, it must be set to a valid Protobuf fully qualified message type representing your block format")
}

r.setProtoMessageType(protobufFullyQualifiedName)

r.logger.Info("console reader protocol version init",
zap.String("version", r.readerProtocolVersion),
zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName),
)

return nil
}

// Formats
// [block_num:342342342] [block_hash] [parent_num] [parent_hash] [lib:123123123] [timestamp:unix_nano] B64ENCODED_any
func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) {
if r.readerProtocolVersion == "" {
return nil, fmt.Errorf("reader protocol version not set, did you forget to send the 'FIRE INIT <reader_protocol_version> <protobuf_fully_qualified_type>' line?")
}

chunks, err := splitInBoundedChunks(line, 5)
if err != nil {
return nil, fmt.Errorf("splitting block log line: %w", err)
}

height, err := strconv.ParseUint(chunks[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing height %q: %w", chunks[0], err)
}

blockCount, err := strconv.ParseUint(chunks[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing block count %q: %w", chunks[1], err)
}

timestampUnixNano, err := strconv.ParseUint(chunks[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing timestamp %q: %w", chunks[2], err)
}

timestamp := time.Unix(0, int64(timestampUnixNano))

tipsetCID, err := cid.Decode(chunks[3])
if err != nil {
return nil, fmt.Errorf("parsing tipset CID %q: %w", chunks[3], err)
}

parentTipsetCID, err := cid.Decode(chunks[4])
if err != nil {
return nil, fmt.Errorf("parsing parent tipset CID %q: %w", chunks[4], err)
}

parentNum := height - 1

libNum := uint64(1)

blockPayload := &anypb.Any{
TypeUrl: r.protoMessageType,
Value: []byte{byte(blockCount)},
}

block := &pbbstream.Block{
Id: tipsetCID.String(),
Number: height,
ParentId: parentTipsetCID.String(),
ParentNum: parentNum,
Timestamp: timestamppb.New(timestamp),
LibNum: 1,
Payload: blockPayload,
}

ConsoleReaderBlockReadCount.Inc()
r.lastBlock = bstream.NewBlockRef(tipsetCID.String(), height)
r.lastParentBlock = bstream.NewBlockRef(parentTipsetCID.String(), parentNum)
r.lastBlockTimestamp = timestamp
r.lib = libNum

return block, nil
}

func (r *ConsoleReader) setProtoMessageType(typeURL string) {
if strings.HasPrefix(typeURL, "type.googleapis.com/") {
r.protoMessageType = typeURL
return
}

if strings.Contains(typeURL, "/") {
panic(fmt.Sprintf("invalid type url %q, expecting type.googleapis.com/", typeURL))
}

r.protoMessageType = "type.googleapis.com/" + typeURL
}

// splitInBoundedChunks splits the line in `count` chunks and returns the slice `chunks[1:count]` (so exclusive end),
// but will accumulate all trailing chunks within the last (for free-form strings, or JSON objects)
func splitInBoundedChunks(line string, count int) ([]string, error) {
chunks := strings.SplitN(line, " ", count)
if len(chunks) != count {
return nil, fmt.Errorf("%d fields required but found %d fields for line %q", count, len(chunks), line)
}

return chunks, nil
}
Loading
Loading