Skip to content

Commit

Permalink
Console Reader (#6)
Browse files Browse the repository at this point in the history
* Add modified ConsoleReader

* Fix imports

* Adds tests, update cmd
  • Loading branch information
ansermino authored Nov 6, 2024
1 parent 57bda4e commit dd40d0e
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 1 deletion.
3 changes: 2 additions & 1 deletion cmd/firefil/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/chainsafe/firehose-filecoin/console_reader"
pbfilecoin "github.com/chainsafe/firehose-filecoin/pb/sf/filecoin/type/v1"
firecore "github.com/streamingfast/firehose-core"
fhCmd "github.com/streamingfast/firehose-core/cmd"
Expand All @@ -17,7 +18,7 @@ func main() {
FirstStreamableBlock: 1,

BlockFactory: func() firecore.Block { return new(pbfilecoin.Tipset) },
ConsoleReaderFactory: firecore.NewConsoleReader,
ConsoleReaderFactory: console_reader.NewConsoleReader,

Tools: &firecore.ToolsConfig[*pbfilecoin.Tipset]{},
})
Expand Down
300 changes: 300 additions & 0 deletions console_reader/console_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package console_reader

import (
"fmt"
"github.com/ipfs/go-cid"
firecore "github.com/streamingfast/firehose-core"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

const FirePrefix = "FIRE "
const FirePrefixLen = len(FirePrefix)
const InitLogPrefix = "INIT "
const InitLogPrefixLen = len(InitLogPrefix)
const BlockLogPrefix = "BLOCK "
const BlockLogPrefixLen = len(BlockLogPrefix)

type ParsingStats struct {
}

type ConsoleReader struct {
lines chan string
done chan interface{}
closeOnce sync.Once
logger *zap.Logger
tracer logging.Tracer

// Parsing context
readerProtocolVersion string
protoMessageType string
lastBlock bstream.BlockRef
lastParentBlock bstream.BlockRef
lastBlockTimestamp time.Time

lib uint64

blockRate *dmetrics.AvgRatePromCounter
}

func NewConsoleReader(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) {
reader := newConsoleReader(lines, logger, tracer)

delayBetweenStats := 30 * time.Second
if tracer.Enabled() {
delayBetweenStats = 5 * time.Second
}

go func() {
defer reader.blockRate.Stop()

for {
select {
case <-reader.done:
return
case <-time.After(delayBetweenStats):
reader.printStats()
}
}
}()

return reader, nil
}

func newConsoleReader(lines chan string, logger *zap.Logger, tracer logging.Tracer) *ConsoleReader {
return &ConsoleReader{
lines: lines,
done: make(chan interface{}),
logger: logger,
tracer: tracer,

blockRate: dmetrics.MustNewAvgRateFromPromCounter(ConsoleReaderBlockReadCount, 1*time.Second, 30*time.Second, "blocks"),
}
}

func (r *ConsoleReader) Done() <-chan interface{} {
return r.done
}

func (r *ConsoleReader) Close() error {
r.closeOnce.Do(func() {
r.blockRate.SyncNow()
r.printStats()

r.logger.Info("console reader done")
close(r.done)
})

return nil
}

type blockRefView struct {
ref bstream.BlockRef
}

func (v blockRefView) String() string {
if v.ref == nil {
return "<unset>"
}

return v.ref.String()
}

type blockRefViewTimestamp struct {
ref bstream.BlockRef
timestamp time.Time
}

func (v blockRefViewTimestamp) String() string {
return fmt.Sprintf("%s @ %s", blockRefView{v.ref}, v.timestamp.Local().Format(time.RFC822Z))
}

func (r *ConsoleReader) printStats() {
r.logger.Info("console reader stats",
zap.Stringer("block_rate", r.blockRate),
zap.Stringer("last_block", blockRefViewTimestamp{r.lastBlock, r.lastBlockTimestamp}),
zap.Stringer("last_parent_block", blockRefView{r.lastParentBlock}),
zap.Uint64("lib", r.lib),
)
}

func (r *ConsoleReader) ReadBlock() (out *pbbstream.Block, err error) {
out, err = r.next()
if err != nil {
return nil, err
}

return out, nil
}

func (r *ConsoleReader) next() (out *pbbstream.Block, err error) {
for line := range r.lines {
if !strings.HasPrefix(line, "FIRE ") {
continue
}

line = line[FirePrefixLen:]

switch {
case strings.HasPrefix(line, BlockLogPrefix):
out, err = r.readBlock(line[BlockLogPrefixLen:])

case strings.HasPrefix(line, InitLogPrefix):
err = r.readInit(line[InitLogPrefixLen:])
default:
if r.tracer.Enabled() {
r.logger.Debug("skipping unknown Firehose log line", zap.String("line", line))
}
continue
}

if err != nil {
chunks := strings.SplitN(line, " ", 2)
return nil, fmt.Errorf("%s: %s (line %q)", chunks[0], err, line)
}

if out != nil {
return out, nil
}
}

r.Close()

return nil, io.EOF
}

// Formats
// [READER_PROTOCOL_VERSION] sf.ethereum.type.v2.Block
func (r *ConsoleReader) readInit(line string) error {
chunks, err := splitInBoundedChunks(line, 2)
if err != nil {
return fmt.Errorf("split: %s", err)
}

r.readerProtocolVersion = chunks[0]

switch r.readerProtocolVersion {
// Implementation of RPC poller were set to use 1.0 so we keep support for it for now
case "1.0", "3.0":
// Supported
default:
return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion)
}

protobufFullyQualifiedName := chunks[1]
if protobufFullyQualifiedName == "" {
return fmt.Errorf("protobuf fully qualified name is empty, it must be set to a valid Protobuf fully qualified message type representing your block format")
}

r.setProtoMessageType(protobufFullyQualifiedName)

r.logger.Info("console reader protocol version init",
zap.String("version", r.readerProtocolVersion),
zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName),
)

return nil
}

// Formats
// [block_num:342342342] [block_hash] [parent_num] [parent_hash] [lib:123123123] [timestamp:unix_nano] B64ENCODED_any
func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) {
if r.readerProtocolVersion == "" {
return nil, fmt.Errorf("reader protocol version not set, did you forget to send the 'FIRE INIT <reader_protocol_version> <protobuf_fully_qualified_type>' line?")
}

chunks, err := splitInBoundedChunks(line, 5)
if err != nil {
return nil, fmt.Errorf("splitting block log line: %w", err)
}

height, err := strconv.ParseUint(chunks[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing height %q: %w", chunks[0], err)
}

blockCount, err := strconv.ParseUint(chunks[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing block count %q: %w", chunks[1], err)
}

timestampUnixNano, err := strconv.ParseUint(chunks[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing timestamp %q: %w", chunks[2], err)
}

timestamp := time.Unix(0, int64(timestampUnixNano))

tipsetCID, err := cid.Decode(chunks[3])
if err != nil {
return nil, fmt.Errorf("parsing tipset CID %q: %w", chunks[3], err)
}

parentTipsetCID, err := cid.Decode(chunks[4])
if err != nil {
return nil, fmt.Errorf("parsing parent tipset CID %q: %w", chunks[4], err)
}

parentNum := height - 1

libNum := uint64(1)

blockPayload := &anypb.Any{
TypeUrl: r.protoMessageType,
Value: []byte{byte(blockCount)},
}

block := &pbbstream.Block{
Id: tipsetCID.String(),
Number: height,
ParentId: parentTipsetCID.String(),
ParentNum: parentNum,
Timestamp: timestamppb.New(timestamp),
LibNum: 1,
Payload: blockPayload,
}

ConsoleReaderBlockReadCount.Inc()
r.lastBlock = bstream.NewBlockRef(tipsetCID.String(), height)
r.lastParentBlock = bstream.NewBlockRef(parentTipsetCID.String(), parentNum)
r.lastBlockTimestamp = timestamp
r.lib = libNum

return block, nil
}

func (r *ConsoleReader) setProtoMessageType(typeURL string) {
if strings.HasPrefix(typeURL, "type.googleapis.com/") {
r.protoMessageType = typeURL
return
}

if strings.Contains(typeURL, "/") {
panic(fmt.Sprintf("invalid type url %q, expecting type.googleapis.com/", typeURL))
}

r.protoMessageType = "type.googleapis.com/" + typeURL
}

// splitInBoundedChunks splits the line in `count` chunks and returns the slice `chunks[1:count]` (so exclusive end),
// but will accumulate all trailing chunks within the last (for free-form strings, or JSON objects)
func splitInBoundedChunks(line string, count int) ([]string, error) {
chunks := strings.SplitN(line, " ", count)
if len(chunks) != count {
return nil, fmt.Errorf("%d fields required but found %d fields for line %q", count, len(chunks), line)
}

return chunks, nil
}
Loading

0 comments on commit dd40d0e

Please sign in to comment.