diff --git a/cmd/firefil/main.go b/cmd/firefil/main.go index 71bbc7b..a8551ab 100644 --- a/cmd/firefil/main.go +++ b/cmd/firefil/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/chainsafe/firehose-filecoin/console_reader" pbfilecoin "github.com/chainsafe/firehose-filecoin/pb/sf/filecoin/type/v1" firecore "github.com/streamingfast/firehose-core" fhCmd "github.com/streamingfast/firehose-core/cmd" @@ -17,7 +18,7 @@ func main() { FirstStreamableBlock: 1, BlockFactory: func() firecore.Block { return new(pbfilecoin.Tipset) }, - ConsoleReaderFactory: firecore.NewConsoleReader, + ConsoleReaderFactory: console_reader.NewConsoleReader, Tools: &firecore.ToolsConfig[*pbfilecoin.Tipset]{}, }) diff --git a/console_reader/console_reader.go b/console_reader/console_reader.go new file mode 100644 index 0000000..432ca1d --- /dev/null +++ b/console_reader/console_reader.go @@ -0,0 +1,300 @@ +package console_reader + +import ( + "fmt" + "github.com/ipfs/go-cid" + firecore "github.com/streamingfast/firehose-core" + "io" + "strconv" + "strings" + "sync" + "time" + + "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetrics" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + "github.com/streamingfast/logging" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const FirePrefix = "FIRE " +const FirePrefixLen = len(FirePrefix) +const InitLogPrefix = "INIT " +const InitLogPrefixLen = len(InitLogPrefix) +const BlockLogPrefix = "BLOCK " +const BlockLogPrefixLen = len(BlockLogPrefix) + +type ParsingStats struct { +} + +type ConsoleReader struct { + lines chan string + done chan interface{} + closeOnce sync.Once + logger *zap.Logger + tracer logging.Tracer + + // Parsing context + readerProtocolVersion string + protoMessageType string + lastBlock bstream.BlockRef + lastParentBlock bstream.BlockRef + lastBlockTimestamp time.Time + + lib uint64 + + blockRate *dmetrics.AvgRatePromCounter +} + +func NewConsoleReader(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) { + reader := newConsoleReader(lines, logger, tracer) + + delayBetweenStats := 30 * time.Second + if tracer.Enabled() { + delayBetweenStats = 5 * time.Second + } + + go func() { + defer reader.blockRate.Stop() + + for { + select { + case <-reader.done: + return + case <-time.After(delayBetweenStats): + reader.printStats() + } + } + }() + + return reader, nil +} + +func newConsoleReader(lines chan string, logger *zap.Logger, tracer logging.Tracer) *ConsoleReader { + return &ConsoleReader{ + lines: lines, + done: make(chan interface{}), + logger: logger, + tracer: tracer, + + blockRate: dmetrics.MustNewAvgRateFromPromCounter(ConsoleReaderBlockReadCount, 1*time.Second, 30*time.Second, "blocks"), + } +} + +func (r *ConsoleReader) Done() <-chan interface{} { + return r.done +} + +func (r *ConsoleReader) Close() error { + r.closeOnce.Do(func() { + r.blockRate.SyncNow() + r.printStats() + + r.logger.Info("console reader done") + close(r.done) + }) + + return nil +} + +type blockRefView struct { + ref bstream.BlockRef +} + +func (v blockRefView) String() string { + if v.ref == nil { + return "" + } + + return v.ref.String() +} + +type blockRefViewTimestamp struct { + ref bstream.BlockRef + timestamp time.Time +} + +func (v blockRefViewTimestamp) String() string { + return fmt.Sprintf("%s @ %s", blockRefView{v.ref}, v.timestamp.Local().Format(time.RFC822Z)) +} + +func (r *ConsoleReader) printStats() { + r.logger.Info("console reader stats", + zap.Stringer("block_rate", r.blockRate), + zap.Stringer("last_block", blockRefViewTimestamp{r.lastBlock, r.lastBlockTimestamp}), + zap.Stringer("last_parent_block", blockRefView{r.lastParentBlock}), + zap.Uint64("lib", r.lib), + ) +} + +func (r *ConsoleReader) ReadBlock() (out *pbbstream.Block, err error) { + out, err = r.next() + if err != nil { + return nil, err + } + + return out, nil +} + +func (r *ConsoleReader) next() (out *pbbstream.Block, err error) { + for line := range r.lines { + if !strings.HasPrefix(line, "FIRE ") { + continue + } + + line = line[FirePrefixLen:] + + switch { + case strings.HasPrefix(line, BlockLogPrefix): + out, err = r.readBlock(line[BlockLogPrefixLen:]) + + case strings.HasPrefix(line, InitLogPrefix): + err = r.readInit(line[InitLogPrefixLen:]) + default: + if r.tracer.Enabled() { + r.logger.Debug("skipping unknown Firehose log line", zap.String("line", line)) + } + continue + } + + if err != nil { + chunks := strings.SplitN(line, " ", 2) + return nil, fmt.Errorf("%s: %s (line %q)", chunks[0], err, line) + } + + if out != nil { + return out, nil + } + } + + r.Close() + + return nil, io.EOF +} + +// Formats +// [READER_PROTOCOL_VERSION] sf.ethereum.type.v2.Block +func (r *ConsoleReader) readInit(line string) error { + chunks, err := splitInBoundedChunks(line, 2) + if err != nil { + return fmt.Errorf("split: %s", err) + } + + r.readerProtocolVersion = chunks[0] + + switch r.readerProtocolVersion { + // Implementation of RPC poller were set to use 1.0 so we keep support for it for now + case "1.0", "3.0": + // Supported + default: + return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion) + } + + protobufFullyQualifiedName := chunks[1] + if protobufFullyQualifiedName == "" { + return fmt.Errorf("protobuf fully qualified name is empty, it must be set to a valid Protobuf fully qualified message type representing your block format") + } + + r.setProtoMessageType(protobufFullyQualifiedName) + + r.logger.Info("console reader protocol version init", + zap.String("version", r.readerProtocolVersion), + zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName), + ) + + return nil +} + +// Formats +// [block_num:342342342] [block_hash] [parent_num] [parent_hash] [lib:123123123] [timestamp:unix_nano] B64ENCODED_any +func (r *ConsoleReader) readBlock(line string) (out *pbbstream.Block, err error) { + if r.readerProtocolVersion == "" { + return nil, fmt.Errorf("reader protocol version not set, did you forget to send the 'FIRE INIT ' line?") + } + + chunks, err := splitInBoundedChunks(line, 5) + if err != nil { + return nil, fmt.Errorf("splitting block log line: %w", err) + } + + height, err := strconv.ParseUint(chunks[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing height %q: %w", chunks[0], err) + } + + blockCount, err := strconv.ParseUint(chunks[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing block count %q: %w", chunks[1], err) + } + + timestampUnixNano, err := strconv.ParseUint(chunks[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing timestamp %q: %w", chunks[2], err) + } + + timestamp := time.Unix(0, int64(timestampUnixNano)) + + tipsetCID, err := cid.Decode(chunks[3]) + if err != nil { + return nil, fmt.Errorf("parsing tipset CID %q: %w", chunks[3], err) + } + + parentTipsetCID, err := cid.Decode(chunks[4]) + if err != nil { + return nil, fmt.Errorf("parsing parent tipset CID %q: %w", chunks[4], err) + } + + parentNum := height - 1 + + libNum := uint64(1) + + blockPayload := &anypb.Any{ + TypeUrl: r.protoMessageType, + Value: []byte{byte(blockCount)}, + } + + block := &pbbstream.Block{ + Id: tipsetCID.String(), + Number: height, + ParentId: parentTipsetCID.String(), + ParentNum: parentNum, + Timestamp: timestamppb.New(timestamp), + LibNum: 1, + Payload: blockPayload, + } + + ConsoleReaderBlockReadCount.Inc() + r.lastBlock = bstream.NewBlockRef(tipsetCID.String(), height) + r.lastParentBlock = bstream.NewBlockRef(parentTipsetCID.String(), parentNum) + r.lastBlockTimestamp = timestamp + r.lib = libNum + + return block, nil +} + +func (r *ConsoleReader) setProtoMessageType(typeURL string) { + if strings.HasPrefix(typeURL, "type.googleapis.com/") { + r.protoMessageType = typeURL + return + } + + if strings.Contains(typeURL, "/") { + panic(fmt.Sprintf("invalid type url %q, expecting type.googleapis.com/", typeURL)) + } + + r.protoMessageType = "type.googleapis.com/" + typeURL +} + +// splitInBoundedChunks splits the line in `count` chunks and returns the slice `chunks[1:count]` (so exclusive end), +// but will accumulate all trailing chunks within the last (for free-form strings, or JSON objects) +func splitInBoundedChunks(line string, count int) ([]string, error) { + chunks := strings.SplitN(line, " ", count) + if len(chunks) != count { + return nil, fmt.Errorf("%d fields required but found %d fields for line %q", count, len(chunks), line) + } + + return chunks, nil +} diff --git a/console_reader/console_reader_test.go b/console_reader/console_reader_test.go new file mode 100644 index 0000000..ae49f1b --- /dev/null +++ b/console_reader/console_reader_test.go @@ -0,0 +1,83 @@ +package console_reader + +import ( + "fmt" + "github.com/ipfs/go-cid" + "testing" + "time" + + "github.com/streamingfast/logging" + "github.com/stretchr/testify/require" +) + +var zlogTest, tracerTest = logging.PackageLogger("test", "github.com/streamingfast/firehose-core/firecore") + +// Example log: +// FIRE BLOCK 2118596 1 1730884260 bafy2bzacebfhqge6ulu6hlgqv6xhnimrhgdxdzg4lccyztitpfpdpx7oe7hzk bafy2bzacebc2l6w2kzfr752pijk4xyhdg7ptiags34s35buim4ilyrmcvchmg + +func Test_Ctx_readBlock(t *testing.T) { + reader := &ConsoleReader{ + logger: zlogTest, + tracer: tracerTest, + + readerProtocolVersion: "1.0", + protoMessageType: "type.googleapis.com/sf.ethereum.type.v2.Block", + } + + height := uint64(2118551) + blockCount := 1 + tipsetCID, _ := cid.Decode("bafy2bzacedv2decyusd7n2kpknsgpiaopxh7ve3grzyfkrxouaza5ohypoi2c") + parentTipsetCID, _ := cid.Decode("bafy2bzacebgnvekynqmddtbkdw77ygogoc4omoy5fsgtrhbd7kxwfsbgcaphm") + + //pbBlock := test.Block{ + // Hash: tipsetCID.Bytes(), + // Number: height, + //} + + //anypbBlock, err := anypb.New(&pbBlock) + + //require.NoError(t, err) + nowNano := time.Now().UnixNano() + line := fmt.Sprintf( + "%d %d %d %s %s", + height, + blockCount, + nowNano, + tipsetCID.String(), + parentTipsetCID.String(), + ) + + block, err := reader.readBlock(line) + require.NoError(t, err) + + require.Equal(t, height, block.Number) + require.Equal(t, tipsetCID.String(), block.Id) + require.Equal(t, parentTipsetCID.String(), block.ParentId) + require.Equal(t, uint64(1), block.LibNum) + require.Equal(t, int32(time.Unix(0, nowNano).Nanosecond()), block.Timestamp.Nanos) + + require.NoError(t, err) + //require.Equal(t, anypbBlock.GetValue(), block.Payload.Value) + +} + +func Test_GetNext(t *testing.T) { + lines := make(chan string, 2) + reader := newConsoleReader(lines, zlogTest, tracerTest) + + initLine := "FIRE INIT 1.0 sf.filecoin.type.v1.Tipset" + blockLine := "FIRE BLOCK 2118596 1 1730884260 bafy2bzacebfhqge6ulu6hlgqv6xhnimrhgdxdzg4lccyztitpfpdpx7oe7hzk bafy2bzacebc2l6w2kzfr752pijk4xyhdg7ptiags34s35buim4ilyrmcvchmg" + + lines <- initLine + lines <- blockLine + close(lines) + + block, err := reader.ReadBlock() + require.NoError(t, err) + + require.Equal(t, uint64(2118596), block.Number) + require.Equal(t, "bafy2bzacebfhqge6ulu6hlgqv6xhnimrhgdxdzg4lccyztitpfpdpx7oe7hzk", block.Id) + require.Equal(t, "bafy2bzacebc2l6w2kzfr752pijk4xyhdg7ptiags34s35buim4ilyrmcvchmg", block.ParentId) + require.Equal(t, uint64(1), block.LibNum) + require.Equal(t, int32(time.Unix(0, 1730884260).Nanosecond()), block.Timestamp.Nanos) +} diff --git a/console_reader/metrics.go b/console_reader/metrics.go new file mode 100644 index 0000000..b9a0d69 --- /dev/null +++ b/console_reader/metrics.go @@ -0,0 +1,11 @@ +package console_reader + +import "github.com/streamingfast/dmetrics" + +func RegisterMetrics() { + metrics.Register() +} + +var metrics = dmetrics.NewSet() + +var ConsoleReaderBlockReadCount = metrics.NewCounter("firecore_console_reader_block_read_count", "Number of blocks read by the console reader")