From c1eda345f86a25779c2203717b968c8aeb85ea2f Mon Sep 17 00:00:00 2001 From: Alexandre Bourget Date: Wed, 29 Jul 2020 12:35:12 -0400 Subject: [PATCH] Blockslog reader, more flexible, backwards and forward. First draft of a `blockfeeder`. --- blockslog/blockslog.go | 182 +++++++++++++++++++++++++------- blockslog/blockslog_test.go | 4 +- cmd/eos-p2p-blockfeed/main.go | 191 ++++++++++++++++++++++++++++++++++ p2p/client.go | 11 +- p2p/handler.go | 2 +- p2p/peer.go | 100 ++++++++++-------- p2p/proxy.go | 1 + p2p/relay.go | 2 +- p2ptypes.go | 19 ++-- 9 files changed, 407 insertions(+), 105 deletions(-) create mode 100644 cmd/eos-p2p-blockfeed/main.go diff --git a/blockslog/blockslog.go b/blockslog/blockslog.go index e546e75c..e19689c9 100644 --- a/blockslog/blockslog.go +++ b/blockslog/blockslog.go @@ -3,7 +3,6 @@ package blockslog import ( "encoding/binary" "encoding/hex" - "encoding/json" "fmt" "io" "os" @@ -11,11 +10,36 @@ import ( "github.com/eoscanada/eos-go" ) -func Process(filename string) error { - fl, err := os.Open(filename) +type Reader struct { + filename string + fl *os.File + + Version uint8 + FirstBlockNum uint32 + ChainID string + + firstOffset int64 + nextOffset int64 + prevOffset int64 +} + +func NewReader(filename string) *Reader { + return &Reader{filename: filename} +} + +func (r *Reader) Close() error { + if r.fl != nil { + return r.fl.Close() + } + return nil +} + +func (r *Reader) ReadHeader() error { + fl, err := os.Open(r.filename) if err != nil { return err } + r.fl = fl versionData := make([]byte, 4) _, err = fl.Read(versionData) @@ -24,13 +48,14 @@ func Process(filename string) error { } version := versionData[0] + r.Version = version - fmt.Println("Version", version) + // fmt.Println("Version", version) firstBlockData := []byte{1, 0, 0, 0} if version > 1 { - fmt.Println("Reading first block") + // fmt.Println("Reading first block") _, err = fl.Read(firstBlockData) if err != nil { return err @@ -38,7 +63,9 @@ func Process(filename string) error { } firstBlockNum := binary.LittleEndian.Uint32(firstBlockData) - fmt.Println("First block", firstBlockNum) + + r.FirstBlockNum = firstBlockNum + // fmt.Println("First block", firstBlockNum) // Certain conditions where the genesis state is written: // bool block_log::contains_genesis_state(uint32_t version, uint32_t first_block_num) { @@ -47,14 +74,13 @@ func Process(filename string) error { chainID := make([]byte, 32) if version >= 3 && firstBlockNum > 1 { - fmt.Println("Reading Chain ID") _, err = fl.Read(chainID) if err != nil { return err } } - fmt.Println("Chain ID", hex.EncodeToString(chainID)) + r.ChainID = hex.EncodeToString(chainID) if version != 1 { totem := make([]byte, 8) @@ -62,44 +88,128 @@ func Process(filename string) error { if err != nil { return err } - fmt.Println("Totem", totem) } - for i := 0 ; i < 5; i++{ - cnt := make([]byte, 1000000) + startPos, err := fl.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } - prevPos, err := fl.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } + r.firstOffset = startPos - _, err = fl.Read(cnt) - if err == io.EOF { - break - } - if err != nil { - return err - } + r.First() - d := eos.NewDecoder(cnt) - var block *eos.SignedBlock + return nil +} - if err := d.Decode(&block); err != nil { - return fmt.Errorf("decoding signed block: %w", err) - } +func (r *Reader) Next() (*eos.SignedBlock, []byte, error) { + if r.nextOffset == -1 { + return nil, nil, io.EOF + } - jsonStr, err := json.Marshal(block) - if err != nil { - return err - } + _, err := r.fl.Seek(r.nextOffset, os.SEEK_SET) + if err != nil { + return nil, nil, err + } - fmt.Println(string(jsonStr)) + blk, bytesRead, err := r.readSignedBlock() + if err != nil { + return nil, nil, err + } - if _, err = fl.Seek(prevPos+int64(d.LastPos())+8, os.SEEK_SET); err != nil { - return err - } - fmt.Println("Last pos", d.LastPos()) + r.prevOffset = r.nextOffset + r.nextOffset = r.nextOffset + int64(len(bytesRead)) + 8 + + return blk, bytesRead, nil +} + +func (r *Reader) First() { + r.nextOffset = r.firstOffset + r.prevOffset = -1 +} + +func (r *Reader) Last() error { + _, err := r.fl.Seek(-8, os.SEEK_END) + if err != nil { + return err + } + cnt := make([]byte, 8) + + _, err = r.fl.Read(cnt) + if err != nil { + return err } + + lastBlockOffset := binary.LittleEndian.Uint64(cnt) + + // TODO: perhaps check if the blocks log is empty, and the whole offset + // thing is less than a single block's size.. + + r.prevOffset = int64(lastBlockOffset) + r.nextOffset = -1 + return nil } + +func (r *Reader) Prev() (*eos.SignedBlock, []byte, error) { + if r.prevOffset == -1 { + return nil, nil, io.EOF + } + + _, err := r.fl.Seek(r.prevOffset-8, os.SEEK_SET) + if err != nil { + return nil, nil, fmt.Errorf("seek -8: %w", err) + } + + prevOffsetBin := make([]byte, 8) + _, err = r.fl.Read(prevOffsetBin) + if err != nil { + return nil, nil, fmt.Errorf("read offset: %w", err) + } + prevOffset := binary.LittleEndian.Uint64(prevOffsetBin) + + blk, bytesRead, err := r.readSignedBlock() + if err != nil { + return nil, nil, fmt.Errorf("read signed block: %w", err) + } + + r.nextOffset = r.prevOffset + r.prevOffset = int64(prevOffset) + + return blk, bytesRead, nil +} + +func (r *Reader) readSignedBlock() (block *eos.SignedBlock, bytesRead []byte, err error) { + cnt := make([]byte, 1000000) + + // prevPos, err := r.fl.Seek(offset, os.SEEK_SET) + // if err != nil { + // return + // } + + _, err = r.fl.Read(cnt) + if err != nil { + return + } + + d := eos.NewDecoder(cnt) + + if err = d.Decode(&block); err != nil { + err = fmt.Errorf("decoding signed block: %w", err) + return + } + + // jsonStr, err := json.Marshal(block) + // if err != nil { + // return err + // } + + // fmt.Println(string(jsonStr)) + + bytesRead = cnt[:d.LastPos()] + + // r.nextOffset = prevPos + int64(d.LastPos()) + 8 + // r.prevOffset = prevPos + return +} diff --git a/blockslog/blockslog_test.go b/blockslog/blockslog_test.go index aa980965..350a7a90 100644 --- a/blockslog/blockslog_test.go +++ b/blockslog/blockslog_test.go @@ -2,10 +2,8 @@ package blockslog import ( "testing" - - "github.com/stretchr/testify/require" ) func TestMe(t *testing.T) { - require.NoError(t, Process("/home/abourget/dfuse/dfuse-eosio/proj/mainnet/mindreader/data/blocks/blocks.log")) + //require.NoError(t, Process("/home/abourget/dfuse/dfuse-eosio/proj/mainnet/mindreader/data/blocks/blocks.log")) } diff --git a/cmd/eos-p2p-blockfeed/main.go b/cmd/eos-p2p-blockfeed/main.go new file mode 100644 index 00000000..660b57ae --- /dev/null +++ b/cmd/eos-p2p-blockfeed/main.go @@ -0,0 +1,191 @@ +package main + +import ( + "encoding/hex" + "flag" + "fmt" + "io" + "log" + "time" + + "github.com/eoscanada/eos-go" + "github.com/eoscanada/eos-go/blockslog" + "github.com/eoscanada/eos-go/p2p" + "github.com/pkg/errors" +) + +var peer = flag.String("peer", "localhost:9876", "peer to connect to") +var blocksLog = flag.String("blocks-log-path", "blocks/blocks.log", "Path to a valid blocks.log file") +var showLog = flag.Bool("v", false, "show detail log") + +func main() { + flag.Parse() + + if *showLog { + p2p.EnableP2PLogging() + } + defer p2p.SyncLogger() + + blkReader := blockslog.NewReader(*blocksLog) + defer blkReader.Close() + if err := blkReader.ReadHeader(); err != nil { + log.Fatal("read header", err) + } + + // firstBlk, err := blkReader.Next() + // if err != nil { + // log.Fatal(err) + // } + + if err := blkReader.Last(); err != nil { + log.Fatal("last", err) + } + + lastBlk, _, err := blkReader.Prev() + if err != nil { + log.Fatal("prev", err) + } + + cID, err := hex.DecodeString(blkReader.ChainID) + if err != nil { + log.Fatal("decode chain id", err) + } + + lastBlockID, err := lastBlk.BlockID() + if err != nil { + log.Fatal("block id compute", err) + } + + handshake := &p2p.HandshakeInfo{ + ChainID: cID, + HeadBlockNum: lastBlk.BlockNumber(), + HeadBlockID: lastBlockID, + HeadBlockTime: lastBlk.Timestamp.Time, + LastIrreversibleBlockNum: lastBlk.BlockNumber(), + LastIrreversibleBlockID: lastBlockID, + } + + fmt.Println("Connect to ", *peer, " with Chain ID:", blkReader.ChainID) + client := NewClient( + p2p.NewOutgoingPeer(*peer, "blockfeeder", handshake), + handshake, + blkReader, + ) + + client.Start() +} + +type Client struct { + peer *p2p.Peer + handshake *p2p.HandshakeInfo + readTimeout time.Duration + blkReader *blockslog.Reader +} + +func NewClient(peer *p2p.Peer, handshake *p2p.HandshakeInfo, blkReader *blockslog.Reader) *Client { + client := &Client{ + peer: peer, + handshake: handshake, + blkReader: blkReader, + } + return client +} + +func (c *Client) SetReadTimeout(readTimeout time.Duration) { + c.readTimeout = readTimeout +} + +func (c *Client) read(peer *p2p.Peer, errChannel chan error) { + for { + packet, err := peer.Read() + if err != nil { + errChannel <- fmt.Errorf("read message from %s: %w", peer.Address, err) + break + } + + //envelope := p2p.NewEnvelope(peer, peer, packet) + + fmt.Printf("Incoming message: %T %v\n", packet.P2PMessage, packet.P2PMessage) + switch m := packet.P2PMessage.(type) { + case *eos.GoAwayMessage: + errChannel <- errors.Wrapf(err, "GoAwayMessage reason %s", m.Reason) + case *eos.HandshakeMessage: + ///fmt.Println("MAMA", m.LastIrreversibleBlockNum, c.handshake.LastIrreversibleBlockNum) + if m.LastIrreversibleBlockNum < c.handshake.LastIrreversibleBlockNum { + fmt.Println("Writing notice message") + err := c.peer.WriteP2PMessage(&eos.NoticeMessage{ + KnownTrx: eos.OrderedSelectIDs{ + Mode: 0, /* mode == none */ + }, + KnownBlocks: eos.OrderedSelectIDs{ + Mode: 3, /* mode == normal */ + Pending: c.handshake.HeadBlockNum, + IDs: []eos.Checksum256{c.handshake.HeadBlockID}, + }, + }) + if err != nil { + errChannel <- err + } + + // Start PUSHING blocks! from their `LastIrreversibleBlockNum` + if err := c.pushBlocks(m.LastIrreversibleBlockNum); err != nil { + errChannel <- err + } + + } + case *eos.NoticeMessage: + case *eos.SignedBlock: + default: + } + } +} + +func (c *Client) pushBlocks(fromBlockNum uint32) error { + c.blkReader.First() + for { + blk, rawBytes, err := c.blkReader.Next() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + blkNum := blk.BlockNumber() + if blkNum < fromBlockNum { + fmt.Println("Skipping block", blkNum) + continue + } + + for i := 0; i < 100; i++ { + fmt.Println("Writing block", blkNum) + err = c.peer.WritePacket(&eos.Packet{ + Type: eos.SignedBlockType, + Payload: rawBytes, + }) + if err != nil { + return err + } + } + } +} + +func (c *Client) Start() error { + errorChannel := make(chan error, 1) + readyChannel := c.peer.Connect(errorChannel) + + for { + select { + case <-readyChannel: + go c.read(c.peer, errorChannel) + + err := c.peer.SendHandshake(c.handshake) + if err != nil { + return fmt.Errorf("start: send handshake: %w", err) + } + + case err := <-errorChannel: + return fmt.Errorf("start failed: %w", err) + } + } +} diff --git a/p2p/client.go b/p2p/client.go index f395d8bd..664be9f1 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -1,15 +1,13 @@ package p2p import ( + "fmt" "math" - - "github.com/pkg/errors" - - "go.uber.org/zap" - "time" "github.com/eoscanada/eos-go" + "github.com/pkg/errors" + "go.uber.org/zap" ) type Client struct { @@ -43,7 +41,6 @@ func (c *Client) SetReadTimeout(readTimeout time.Duration) { } func (c *Client) RegisterHandler(handler Handler) { - c.handlers = append(c.handlers, handler) } @@ -60,6 +57,7 @@ func (c *Client) read(peer *Peer, errChannel chan error) { handle.Handle(envelope) } + fmt.Printf("Incoming message: %T %v\n", packet.P2PMessage, packet.P2PMessage) switch m := packet.P2PMessage.(type) { case *eos.GoAwayMessage: errChannel <- errors.Wrapf(err, "GoAwayMessage reason %s", m.Reason) @@ -126,6 +124,7 @@ func (c *Client) read(peer *Peer, errChannel chan error) { } } } + default: } } } diff --git a/p2p/handler.go b/p2p/handler.go index 51b9d07e..ab28fbcb 100644 --- a/p2p/handler.go +++ b/p2p/handler.go @@ -31,7 +31,7 @@ var LoggerHandler = HandlerFunc(func(envelope *Envelope) { var StringLoggerHandler = HandlerFunc(func(envelope *Envelope) { name, _ := envelope.Packet.Type.Name() p2pLog.Info( - "handler Packet", + "handle incoming packet", zap.String("name", name), zap.String("sender", envelope.Sender.Address), zap.String("receiver", envelope.Receiver.Address), diff --git a/p2p/peer.go b/p2p/peer.go index 9db095c3..82da510e 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,26 +1,22 @@ package p2p import ( + "bufio" "bytes" "crypto/rand" "encoding/hex" + "encoding/json" "fmt" "io" "net" + "runtime" "time" + "github.com/eoscanada/eos-go" + "github.com/eoscanada/eos-go/ecc" "github.com/pkg/errors" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - - "runtime" - - "bufio" - - "github.com/eoscanada/eos-go" - "github.com/eoscanada/eos-go/ecc" ) type Peer struct { @@ -32,6 +28,7 @@ type Peer struct { reader io.Reader listener bool handshakeInfo *HandshakeInfo + handshakeGeneration int64 connectionTimeout time.Duration handshakeTimeout time.Duration cancelHandshakeTimeout chan bool @@ -108,11 +105,18 @@ func (p *Peer) Read() (*eos.Packet, error) { return packet, nil } -func (p *Peer) SetConnection(conn net.Conn) { +func (p *Peer) setConnection(conn net.Conn) { p.connection = conn p.reader = bufio.NewReader(p.connection) } +func (p *Peer) Close() error { + if p.connection == nil { + return nil + } + return p.connection.Close() +} + func (p *Peer) Connect(errChan chan error) (ready chan bool) { nodeID := make([]byte, 32) @@ -144,7 +148,7 @@ func (p *Peer) Connect(errChan chan error) (ready chan bool) { } p2pLog.Debug("Connected on", address2log) - p.SetConnection(conn) + p.setConnection(conn) ready <- true } else { @@ -170,8 +174,9 @@ func (p *Peer) Connect(errChan chan error) (ready chan bool) { return } p2pLog.Info("Connected to", address2log) - p.connection = conn - p.reader = bufio.NewReader(conn) + + p.setConnection(conn) + ready <- true } }() @@ -180,23 +185,24 @@ func (p *Peer) Connect(errChan chan error) (ready chan bool) { } func (p *Peer) Write(bytes []byte) (int, error) { - return p.connection.Write(bytes) } func (p *Peer) WriteP2PMessage(message eos.P2PMessage) (err error) { - packet := &eos.Packet{ Type: message.GetType(), P2PMessage: message, } + return p.WritePacket(packet) +} +func (p *Peer) WritePacket(packet *eos.Packet) error { buff := bytes.NewBuffer(make([]byte, 0, 512)) encoder := eos.NewEncoder(buff) - err = encoder.Encode(packet) + err := encoder.Encode(packet) if err != nil { - return errors.Wrapf(err, "unable to encode message %s", message) + return errors.Wrapf(err, "unable to encode message %s", packet) } _, err = p.Write(buff.Bytes()) @@ -227,12 +233,12 @@ func (p *Peer) SendRequest(startBlockNum uint32, endBlockNumber uint32) (err err zap.Uint32("end", endBlockNumber)) request := &eos.RequestMessage{ - ReqTrx: eos.OrderedBlockIDs{ - Mode: [4]byte{0, 0, 0, 0}, + ReqTrx: eos.OrderedSelectIDs{ + Mode: 0, Pending: startBlockNum, }, - ReqBlocks: eos.OrderedBlockIDs{ - Mode: [4]byte{0, 0, 0, 0}, + ReqBlocks: eos.OrderedSelectIDs{ + Mode: 0, Pending: endBlockNumber, }, } @@ -240,25 +246,25 @@ func (p *Peer) SendRequest(startBlockNum uint32, endBlockNumber uint32) (err err return errors.WithStack(p.WriteP2PMessage(request)) } -func (p *Peer) SendNotice(headBlockNum uint32, libNum uint32, mode byte) error { - p2pLog.Debug("Send Notice", - zap.String("peer", p.Address), - zap.Uint32("head", headBlockNum), - zap.Uint32("lib", libNum), - zap.Uint8("type", mode)) - - notice := &eos.NoticeMessage{ - KnownTrx: eos.OrderedBlockIDs{ - Mode: [4]byte{mode, 0, 0, 0}, - Pending: headBlockNum, - }, - KnownBlocks: eos.OrderedBlockIDs{ - Mode: [4]byte{mode, 0, 0, 0}, - Pending: libNum, - }, - } - return errors.WithStack(p.WriteP2PMessage(notice)) -} +// func (p *Peer) SendNotice(headBlockNum uint32, libNum uint32, mode byte) error { +// p2pLog.Debug("Send Notice", +// zap.String("peer", p.Address), +// zap.Uint32("head", headBlockNum), +// zap.Uint32("lib", libNum), +// zap.Uint8("type", mode)) + +// notice := &eos.NoticeMessage{ +// KnownTrx: eos.OrderedSelectIDs{ +// Mode: mode, +// Pending: headBlockNum, +// }, +// KnownBlocks: eos.OrderedSelectIDs{ +// Mode: mode, +// Pending: libNum, +// }, +// } +// return errors.WithStack(p.WriteP2PMessage(notice)) +// } func (p *Peer) SendTime() error { p2pLog.Debug("SendTime", zap.String("peer", p.Address)) @@ -268,19 +274,16 @@ func (p *Peer) SendTime() error { } func (p *Peer) SendHandshake(info *HandshakeInfo) error { - publicKey, err := ecc.NewPublicKey("EOS1111111111111111111111111111111114T1Anm") if err != nil { return errors.Wrapf(err, "sending handshake to %s: create public key", p.Address) } - p2pLog.Debug("SendHandshake", zap.String("peer", p.Address), zap.Object("info", info)) - tstamp := eos.Tstamp{Time: info.HeadBlockTime} - signature := ecc.Signature{ - Curve: ecc.CurveK1, - Content: make([]byte, 65, 65), + signature, err := ecc.NewSignature("SIG_K1_111111111111111111111111111111111111111111111111111111111111111116uk5ne") + if err != nil { + return err } handshake := &eos.HandshakeMessage{ @@ -301,6 +304,11 @@ func (p *Peer) SendHandshake(info *HandshakeInfo) error { Generation: int16(1), } + cnt, _ := json.Marshal(handshake) + + fmt.Println("sending handshake_message", string(cnt)) + //p2pLog.Debug("sending handshake_message", zap.String("peer", p.Address), zap.Reflect("info", handshake)) + err = p.WriteP2PMessage(handshake) if err != nil { err = errors.Wrapf(err, "sending handshake to %s", p.Address) diff --git a/p2p/proxy.go b/p2p/proxy.go index 4fb95e3e..50816abe 100644 --- a/p2p/proxy.go +++ b/p2p/proxy.go @@ -72,6 +72,7 @@ func (p *Proxy) handle(packet *eos.Packet, sender *Peer, receiver *Peer) error { } func triggerHandshake(peer *Peer) error { + fmt.Println("Sending handshake") return peer.SendHandshake(peer.handshakeInfo) } diff --git a/p2p/relay.go b/p2p/relay.go index b90b7574..1fab66f9 100644 --- a/p2p/relay.go +++ b/p2p/relay.go @@ -43,7 +43,7 @@ func (r *Relay) startProxy(conn net.Conn) { select { case <-destinationReadyChannel: remotePeer := newPeer(remoteAddress, fmt.Sprintf("agent-%s", remoteAddress), false, nil) - remotePeer.SetConnection(conn) + remotePeer.setConnection(conn) proxy := NewProxy(remotePeer, destinationPeer) proxy.RegisterHandlers(r.handlers) diff --git a/p2ptypes.go b/p2ptypes.go index 7ba5f9c0..8f63bcf0 100644 --- a/p2ptypes.go +++ b/p2ptypes.go @@ -589,18 +589,13 @@ const ( normal ) -type OrderedTransactionIDs struct { - Mode [4]byte `json:"mode"` - Pending uint32 `json:"pending"` - IDs []Checksum256 `json:"ids"` -} -type OrderedBlockIDs struct { - Mode [4]byte `json:"mode"` +type OrderedSelectIDs struct { + Mode uint32 `json:"mode"` Pending uint32 `json:"pending"` IDs []Checksum256 `json:"ids"` } -func (o *OrderedBlockIDs) String() string { +func (o *OrderedSelectIDs) String() string { ids := "" for _, id := range o.IDs { @@ -610,8 +605,8 @@ func (o *OrderedBlockIDs) String() string { } type NoticeMessage struct { - KnownTrx OrderedBlockIDs `json:"known_trx"` - KnownBlocks OrderedBlockIDs `json:"known_blocks"` + KnownTrx OrderedSelectIDs `json:"known_trx"` + KnownBlocks OrderedSelectIDs `json:"known_blocks"` } func (n *NoticeMessage) String() string { @@ -635,8 +630,8 @@ func (m *SyncRequestMessage) String() string { } type RequestMessage struct { - ReqTrx OrderedBlockIDs `json:"req_trx"` - ReqBlocks OrderedBlockIDs `json:"req_blocks"` + ReqTrx OrderedSelectIDs `json:"req_trx"` + ReqBlocks OrderedSelectIDs `json:"req_blocks"` } func (r *RequestMessage) String() string {