From 0826c05829b6a96edc497e20fffca2cb0b179f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Wed, 13 Oct 2021 21:42:56 +0200 Subject: [PATCH] Block sync (#139) * Block sync * aggregator renamed to blockManager * blocks are passed to blockManager after gossiping * DALC(BlockRetriever) used to fetch block data * Improve cross-package code coverage with go-acc Co-authored-by: Ismail Khoffi --- .github/workflows/test.yml | 4 +- block/manager.go | 286 ++++++++++++++++++ .../manager_test.go | 79 +---- config/config.go | 6 +- da/mock/mock.go | 69 ++--- node/aggregator.go | 172 ----------- node/integration_test.go | 139 +++++++-- node/node.go | 62 ++-- state/executor.go | 12 +- 9 files changed, 483 insertions(+), 346 deletions(-) create mode 100644 block/manager.go rename node/aggregator_test.go => block/manager_test.go (51%) delete mode 100644 node/aggregator.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d6abd37e3c9..87a079e8a4d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,9 @@ jobs: run: go build -v ./... - name: Test & Coverage - run: go test -race -coverprofile=coverage.txt -covermode=atomic -v ./... + run: | + go install github.com/ory/go-acc@v0.2.6 + go-acc -o coverage.txt ./... -- -v --race - uses: codecov/codecov-action@v2.1.0 with: diff --git a/block/manager.go b/block/manager.go new file mode 100644 index 00000000000..3f3ab671ff3 --- /dev/null +++ b/block/manager.go @@ -0,0 +1,286 @@ +package block + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/tendermint/tendermint/proxy" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/optimint/config" + "github.com/celestiaorg/optimint/da" + "github.com/celestiaorg/optimint/log" + "github.com/celestiaorg/optimint/mempool" + "github.com/celestiaorg/optimint/state" + "github.com/celestiaorg/optimint/store" + "github.com/celestiaorg/optimint/types" +) + +// Manager is responsible for aggregating transactions into blocks. +type Manager struct { + lastState state.State + + conf config.BlockManagerConfig + genesis *tmtypes.GenesisDoc + + proposerKey crypto.PrivKey + + store store.Store + executor *state.BlockExecutor + + dalc da.DataAvailabilityLayerClient + retriever da.BlockRetriever + + HeaderOutCh chan *types.Header + HeaderInCh chan *types.Header + + syncTarget uint64 + blockInCh chan *types.Block + retrieveCh chan uint64 + syncCache map[uint64]*types.Block + + logger log.Logger +} + +// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc. +func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc) (state.State, error) { + s, err := store.LoadState() + if err != nil { + s, err = state.NewFromGenesisDoc(genesis) + } + return s, err +} + +func NewManager( + proposerKey crypto.PrivKey, + conf config.BlockManagerConfig, + genesis *tmtypes.GenesisDoc, + store store.Store, + mempool mempool.Mempool, + proxyApp proxy.AppConnConsensus, + dalc da.DataAvailabilityLayerClient, + logger log.Logger, +) (*Manager, error) { + s, err := getInitialState(store, genesis) + if err != nil { + return nil, err + } + + proposerAddress, err := proposerKey.GetPublic().Raw() + if err != nil { + return nil, err + } + + exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, mempool, proxyApp, logger) + + agg := &Manager{ + proposerKey: proposerKey, + conf: conf, + genesis: genesis, + lastState: s, + store: store, + executor: exec, + dalc: dalc, + retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) + HeaderOutCh: make(chan *types.Header), + HeaderInCh: make(chan *types.Header), + blockInCh: make(chan *types.Block), + retrieveCh: make(chan uint64), + syncCache: make(map[uint64]*types.Block), + logger: logger, + } + + return agg, nil +} + +func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) { + m.dalc = dalc + m.retriever = dalc.(da.BlockRetriever) +} + +func (m *Manager) AggregationLoop(ctx context.Context) { + timer := time.NewTimer(m.conf.BlockTime) + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + start := time.Now() + err := m.publishBlock(ctx) + if err != nil { + m.logger.Error("error while publishing block", "error", err) + } + timer.Reset(m.getRemainingSleep(start)) + } + } +} + +func (m *Manager) SyncLoop(ctx context.Context) { + for { + select { + case header := <-m.HeaderInCh: + m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash()) + newHeight := header.Height + currentHeight := m.store.Height() + // in case of client reconnecting after being offline + // newHeight may be significantly larger than currentHeight + // it's handled gently in RetrieveLoop + if newHeight > currentHeight { + atomic.StoreUint64(&m.syncTarget, newHeight) + m.retrieveCh <- newHeight + } + case block := <-m.blockInCh: + m.logger.Debug("block body retrieved from DALC", + "height", block.Header.Height, + "hash", block.Hash(), + ) + m.syncCache[block.Header.Height] = block + currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory + b1, ok1 := m.syncCache[currentHeight+1] + b2, ok2 := m.syncCache[currentHeight+2] + if ok1 && ok2 { + newState, _, err := m.executor.ApplyBlock(ctx, m.lastState, b1) + if err != nil { + m.logger.Error("failed to ApplyBlock", "error", err) + continue + } + err = m.store.SaveBlock(b1, &b2.LastCommit) + if err != nil { + m.logger.Error("failed to save block", "error", err) + continue + } + + m.lastState = newState + err = m.store.UpdateState(m.lastState) + if err != nil { + m.logger.Error("failed to save updated state", "error", err) + continue + } + delete(m.syncCache, currentHeight+1) + } + case <-ctx.Done(): + return + } + } +} + +func (m *Manager) RetrieveLoop(ctx context.Context) { + for { + select { + case <-m.retrieveCh: + target := atomic.LoadUint64(&m.syncTarget) + for h := m.store.Height() + 1; h <= target; h++ { + m.logger.Debug("trying to retrieve block from DALC", "height", h) + m.mustRetrieveBlock(ctx, h) + } + case <-ctx.Done(): + return + } + } +} + +func (m *Manager) mustRetrieveBlock(ctx context.Context, height uint64) { + // TOOD(tzdybal): extract configuration option + maxRetries := 10 + + for r := 0; r < maxRetries; r++ { + err := m.fetchBlock(ctx, height) + if err == nil { + return + } + // TODO(tzdybal): configuration option + // TODO(tzdybal): exponential backoff + time.Sleep(100 * time.Millisecond) + } + // TODO(tzdybal): this is only temporary solution, for MVP + panic("failed to retrieve block with DALC") +} + +func (m *Manager) fetchBlock(ctx context.Context, height uint64) error { + var err error + blockRes := m.retriever.RetrieveBlock(height) + switch blockRes.Code { + case da.StatusSuccess: + m.blockInCh <- blockRes.Block + case da.StatusError: + err = fmt.Errorf("failed to retrieve block: %s", blockRes.Message) + case da.StatusTimeout: + err = fmt.Errorf("timeout during retrieve block: %s", blockRes.Message) + } + return err +} + +func (m *Manager) getRemainingSleep(start time.Time) time.Duration { + publishingDuration := time.Since(start) + sleepDuration := m.conf.BlockTime - publishingDuration + if sleepDuration < 0 { + sleepDuration = 0 + } + return sleepDuration +} + +func (m *Manager) publishBlock(ctx context.Context) error { + var lastCommit *types.Commit + var err error + newHeight := m.store.Height() + 1 + + // this is a special case, when first block is produced - there is no previous commit + if newHeight == uint64(m.genesis.InitialHeight) { + lastCommit = &types.Commit{Height: m.store.Height(), HeaderHash: [32]byte{}} + } else { + lastCommit, err = m.store.LoadCommit(m.store.Height()) + if err != nil { + return fmt.Errorf("error while loading last commit: %w", err) + } + } + + m.logger.Info("Creating and publishing block", "height", newHeight) + + block := m.executor.CreateBlock(newHeight, lastCommit, m.lastState) + m.logger.Debug("block info", "num_tx", len(block.Data.Txs)) + newState, _, err := m.executor.ApplyBlock(ctx, m.lastState, block) + if err != nil { + return err + } + + headerBytes, err := block.Header.MarshalBinary() + if err != nil { + return err + } + sign, err := m.proposerKey.Sign(headerBytes) + if err != nil { + return err + } + + commit := &types.Commit{ + Height: block.Header.Height, + HeaderHash: block.Header.Hash(), + Signatures: []types.Signature{sign}, + } + err = m.store.SaveBlock(block, commit) + if err != nil { + return err + } + + m.lastState = newState + err = m.store.UpdateState(m.lastState) + if err != nil { + return err + } + + return m.broadcastBlock(ctx, block) +} + +func (m *Manager) broadcastBlock(ctx context.Context, block *types.Block) error { + res := m.dalc.SubmitBlock(block) + if res.Code != da.StatusSuccess { + return fmt.Errorf("DA layer submission failed: %s", res.Message) + } + + m.HeaderOutCh <- &block.Header + + return nil +} diff --git a/node/aggregator_test.go b/block/manager_test.go similarity index 51% rename from node/aggregator_test.go rename to block/manager_test.go index df12578e551..5d37de6f8b1 100644 --- a/node/aggregator_test.go +++ b/block/manager_test.go @@ -1,79 +1,23 @@ -package node +package block import ( - "context" "crypto/rand" - mrand "math/rand" "testing" "time" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/types" "github.com/celestiaorg/optimint/config" - "github.com/celestiaorg/optimint/mocks" - "github.com/celestiaorg/optimint/p2p" + "github.com/celestiaorg/optimint/da" + mockda "github.com/celestiaorg/optimint/da/mock" "github.com/celestiaorg/optimint/state" "github.com/celestiaorg/optimint/store" ) -func TestAggregatorMode(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - app := &mocks.Application{} - app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) - app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) - app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) - app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) - - key, _, _ := crypto.GenerateEd25519Key(rand.Reader) - anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - - aggregatorConfig := config.AggregatorConfig{ - BlockTime: 500 * time.Millisecond, - NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, - } - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, AggregatorConfig: aggregatorConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) - require.NoError(err) - require.NotNil(node) - - assert.False(node.IsRunning()) - - err = node.Start() - assert.NoError(err) - defer func() { - err := node.Stop() - assert.NoError(err) - }() - assert.True(node.IsRunning()) - - pid, err := peer.IDFromPrivateKey(anotherKey) - require.NoError(err) - ctx, cancel := context.WithCancel(context.TODO()) - go func() { - for { - select { - case <-ctx.Done(): - return - default: - node.incomingTxCh <- &p2p.GossipMessage{Data: []byte(time.Now().String()), From: pid} - time.Sleep(time.Duration(mrand.Uint32()%20) * time.Millisecond) - } - } - }() - time.Sleep(5 * time.Second) - cancel() -} - func TestInitialState(t *testing.T) { genesis := &types.GenesisDoc{ ChainID: "genesis id", @@ -118,7 +62,7 @@ func TestInitialState(t *testing.T) { } key, _, _ := crypto.GenerateEd25519Key(rand.Reader) - conf := config.AggregatorConfig{ + conf := config.BlockManagerConfig{ BlockTime: 10 * time.Second, NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, } @@ -126,7 +70,9 @@ func TestInitialState(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { assert := assert.New(t) - agg, err := newAggregator(key, conf, c.genesis, c.store, nil, nil, nil, log.TestingLogger()) + logger := log.TestingLogger() + dalc := getMockDALC(logger) + agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, logger) assert.NoError(err) assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.lastState.ChainID) @@ -135,3 +81,10 @@ func TestInitialState(t *testing.T) { }) } } + +func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient { + dalc := &mockda.MockDataAvailabilityLayerClient{} + _ = dalc.Init(nil, nil, logger) + _ = dalc.Start() + return dalc +} diff --git a/config/config.go b/config/config.go index 21a08b33f2d..0a3bc2d2102 100644 --- a/config/config.go +++ b/config/config.go @@ -8,13 +8,13 @@ type NodeConfig struct { DBPath string P2P P2PConfig Aggregator bool - AggregatorConfig + BlockManagerConfig DALayer string DAConfig []byte } -// AggregatorConfig consists of all parameters required by Aggregator. -type AggregatorConfig struct { +// BlockManagerConfig consists of all parameters required by BlockManagerConfig +type BlockManagerConfig struct { BlockTime time.Duration NamespaceID [8]byte } diff --git a/da/mock/mock.go b/da/mock/mock.go index 0fad3242151..b4516d83ab2 100644 --- a/da/mock/mock.go +++ b/da/mock/mock.go @@ -1,7 +1,7 @@ package mock import ( - "encoding/binary" + "sync" "github.com/celestiaorg/optimint/da" "github.com/celestiaorg/optimint/log" @@ -13,16 +13,23 @@ import ( // It does actually ensures DA - it stores data in-memory. type MockDataAvailabilityLayerClient struct { logger log.Logger - dalcKV store.KVStore + + Blocks map[[32]byte]*types.Block + BlockIndex map[uint64][32]byte + + mtx sync.Mutex } var _ da.DataAvailabilityLayerClient = &MockDataAvailabilityLayerClient{} var _ da.BlockRetriever = &MockDataAvailabilityLayerClient{} // Init is called once to allow DA client to read configuration and initialize resources. -func (m *MockDataAvailabilityLayerClient) Init(config []byte, dalcKV store.KVStore, logger log.Logger) error { +func (m *MockDataAvailabilityLayerClient) Init(config []byte, kvStore store.KVStore, logger log.Logger) error { + m.mtx.Lock() + defer m.mtx.Unlock() m.logger = logger - m.dalcKV = dalcKV + m.Blocks = make(map[[32]byte]*types.Block) + m.BlockIndex = make(map[uint64][32]byte) return nil } @@ -42,22 +49,13 @@ func (m *MockDataAvailabilityLayerClient) Stop() error { // This should create a transaction which (potentially) // triggers a state transition in the DA layer. func (m *MockDataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultSubmitBlock { + m.mtx.Lock() + defer m.mtx.Unlock() m.logger.Debug("Submitting block to DA layer!", "height", block.Header.Height) hash := block.Header.Hash() - blob, err := block.MarshalBinary() - if err != nil { - return da.ResultSubmitBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}} - } - - err = m.dalcKV.Set(getKey(block.Header.Height), hash[:]) - if err != nil { - return da.ResultSubmitBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}} - } - err = m.dalcKV.Set(hash[:], blob) - if err != nil { - return da.ResultSubmitBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}} - } + m.Blocks[hash] = block + m.BlockIndex[block.Header.Height] = hash return da.ResultSubmitBlock{ DAResult: da.DAResult{ @@ -69,36 +67,19 @@ func (m *MockDataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.Res // CheckBlockAvailability queries DA layer to check data availability of block corresponding to given header. func (m *MockDataAvailabilityLayerClient) CheckBlockAvailability(header *types.Header) da.ResultCheckBlock { - hash := header.Hash() - _, err := m.dalcKV.Get(hash[:]) - if err != nil { - return da.ResultCheckBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, DataAvailable: false} - } - return da.ResultCheckBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, DataAvailable: true} + m.mtx.Lock() + defer m.mtx.Unlock() + _, ok := m.Blocks[header.Hash()] + return da.ResultCheckBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, DataAvailable: ok} } // RetrieveBlock returns block at given height from data availability layer. func (m *MockDataAvailabilityLayerClient) RetrieveBlock(height uint64) da.ResultRetrieveBlock { - hash, err := m.dalcKV.Get(getKey(height)) - if err != nil { - return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}} + m.mtx.Lock() + defer m.mtx.Unlock() + hash, ok := m.BlockIndex[height] + if !ok { + return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError}} } - blob, err := m.dalcKV.Get(hash) - if err != nil { - return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}} - } - - block := &types.Block{} - err = block.UnmarshalBinary(blob) - if err != nil { - return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}} - } - - return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, Block: block} -} - -func getKey(height uint64) []byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, height) - return b + return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, Block: m.Blocks[hash]} } diff --git a/node/aggregator.go b/node/aggregator.go deleted file mode 100644 index 9e94ed11452..00000000000 --- a/node/aggregator.go +++ /dev/null @@ -1,172 +0,0 @@ -package node - -import ( - "context" - "fmt" - "time" - - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/tendermint/tendermint/proxy" - lltypes "github.com/tendermint/tendermint/types" - - "github.com/celestiaorg/optimint/config" - "github.com/celestiaorg/optimint/da" - "github.com/celestiaorg/optimint/log" - "github.com/celestiaorg/optimint/mempool" - "github.com/celestiaorg/optimint/state" - "github.com/celestiaorg/optimint/store" - "github.com/celestiaorg/optimint/types" -) - -// aggregator is responsible for aggregating transactions into blocks. -type aggregator struct { - lastState state.State - - conf config.AggregatorConfig - genesis *lltypes.GenesisDoc - - proposerKey crypto.PrivKey - - store store.Store - dalc da.DataAvailabilityLayerClient - executor *state.BlockExecutor - - headerCh chan *types.Header - - logger log.Logger -} - -// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc. -func getInitialState(store store.Store, genesis *lltypes.GenesisDoc) (state.State, error) { - s, err := store.LoadState() - if err != nil { - s, err = state.NewFromGenesisDoc(genesis) - } - return s, err -} - -func newAggregator( - proposerKey crypto.PrivKey, - conf config.AggregatorConfig, - genesis *lltypes.GenesisDoc, - store store.Store, - mempool mempool.Mempool, - proxyApp proxy.AppConnConsensus, - dalc da.DataAvailabilityLayerClient, - logger log.Logger, -) (*aggregator, error) { - s, err := getInitialState(store, genesis) - if err != nil { - return nil, err - } - - proposerAddress, err := proposerKey.GetPublic().Raw() - if err != nil { - return nil, err - } - - exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, mempool, proxyApp, logger) - - agg := &aggregator{ - proposerKey: proposerKey, - conf: conf, - genesis: genesis, - lastState: s, - store: store, - executor: exec, - dalc: dalc, - headerCh: make(chan *types.Header), - logger: logger, - } - - return agg, nil -} - -func (a *aggregator) aggregationLoop(ctx context.Context) { - timer := time.NewTimer(a.conf.BlockTime) - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - start := time.Now() - err := a.publishBlock(ctx) - if err != nil { - a.logger.Error("error while publishing block", "error", err) - } - timer.Reset(a.getRemainingSleep(start)) - } - } -} - -func (a *aggregator) getRemainingSleep(start time.Time) time.Duration { - publishingDuration := time.Since(start) - sleepDuration := a.conf.BlockTime - publishingDuration - if sleepDuration < 0 { - sleepDuration = 0 - } - return sleepDuration -} - -func (a *aggregator) publishBlock(ctx context.Context) error { - a.logger.Info("Creating and publishing block", "height", a.store.Height()) - - var lastCommit *types.Commit - var err error - newHeight := a.store.Height() + 1 - - // this is a special case, when first block is produced - there is no previous commit - if newHeight == uint64(a.genesis.InitialHeight) { - lastCommit = &types.Commit{Height: a.store.Height(), HeaderHash: [32]byte{}} - } else { - lastCommit, err = a.store.LoadCommit(a.store.Height()) - if err != nil { - return fmt.Errorf("error while loading last commit: %w", err) - } - } - - block := a.executor.CreateBlock(newHeight, lastCommit, a.lastState) - a.logger.Debug("block info", "num_tx", len(block.Data.Txs)) - newState, _, err := a.executor.ApplyBlock(ctx, a.lastState, block) - if err != nil { - return err - } - - headerBytes, err := block.Header.MarshalBinary() - if err != nil { - return err - } - sign, err := a.proposerKey.Sign(headerBytes) - if err != nil { - return err - } - - commit := &types.Commit{ - Height: block.Header.Height, - HeaderHash: block.Header.Hash(), - Signatures: []types.Signature{sign}, - } - err = a.store.SaveBlock(block, commit) - if err != nil { - return err - } - - a.lastState = newState - err = a.store.UpdateState(a.lastState) - if err != nil { - return err - } - - return a.broadcastBlock(ctx, block) -} - -func (a *aggregator) broadcastBlock(ctx context.Context, block *types.Block) error { - res := a.dalc.SubmitBlock(block) - if res.Code != da.StatusSuccess { - return fmt.Errorf("DA layer submission failed: %s", res.Message) - } - - a.headerCh <- &block.Header - - return nil -} diff --git a/node/integration_test.go b/node/integration_test.go index 3cbcca6b724..905eb996fce 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -3,6 +3,10 @@ package node import ( "context" "crypto/rand" + mockda "github.com/celestiaorg/optimint/da/mock" + "github.com/celestiaorg/optimint/p2p" + "github.com/stretchr/testify/assert" + mrand "math/rand" "strconv" "strings" "testing" @@ -18,15 +22,68 @@ import ( "github.com/tendermint/tendermint/types" "github.com/celestiaorg/optimint/config" + "github.com/celestiaorg/optimint/da" "github.com/celestiaorg/optimint/mocks" ) +func TestAggregatorMode(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + app := &mocks.Application{} + app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) + + key, _, _ := crypto.GenerateEd25519Key(rand.Reader) + anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) + + blockManagerConfig := config.BlockManagerConfig{ + BlockTime: 1 * time.Second, + NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, + } + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + require.NoError(err) + require.NotNil(node) + + assert.False(node.IsRunning()) + + err = node.Start() + assert.NoError(err) + defer func() { + err := node.Stop() + assert.NoError(err) + }() + assert.True(node.IsRunning()) + + pid, err := peer.IDFromPrivateKey(anotherKey) + require.NoError(err) + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + node.incomingTxCh <- &p2p.GossipMessage{Data: []byte(time.Now().String()), From: pid} + time.Sleep(time.Duration(mrand.Uint32()%20) * time.Millisecond) + } + } + }() + time.Sleep(5 * time.Second) + cancel() +} + // TestTxGossipingAndAggregation setups a network of nodes, with single aggregator and multiple producers. // Nodes should gossip transactions and aggregator node should produce blocks. func TestTxGossipingAndAggregation(t *testing.T) { + assert := assert.New(t) require := require.New(t) - nodes, aggApp := createNodes(11, t) + clientNodes := 4 + nodes, apps := createNodes(clientNodes+1, t) for _, n := range nodes { require.NoError(n.Start()) @@ -44,12 +101,48 @@ func TestTxGossipingAndAggregation(t *testing.T) { for _, n := range nodes { require.NoError(n.Stop()) } + aggApp := apps[0] + apps = apps[1:] - aggApp.AssertNumberOfCalls(t, "DeliverTx", 10) + aggApp.AssertNumberOfCalls(t, "DeliverTx", clientNodes) aggApp.AssertExpectations(t) + + for i, app := range apps { + app.AssertNumberOfCalls(t, "DeliverTx", clientNodes) + app.AssertExpectations(t) + + // assert that we have most of the blocks from aggregator + beginCnt := 0 + endCnt := 0 + commitCnt := 0 + for _, call := range app.Calls { + switch call.Method { + case "BeginBlock": + beginCnt++ + case "EndBlock": + endCnt++ + case "Commit": + commitCnt++ + } + } + aggregatorHeight := nodes[0].Store.Height() + adjustedHeight := int(aggregatorHeight - 3) // 3 is completely arbitrary + assert.GreaterOrEqual(beginCnt, adjustedHeight) + assert.GreaterOrEqual(endCnt, adjustedHeight) + assert.GreaterOrEqual(commitCnt, adjustedHeight) + + // assert that all blocks known to node are same as produced by aggregator + for h := uint64(1); h <= nodes[i].Store.Height(); h++ { + nodeBlock, err := nodes[i].Store.LoadBlock(h) + require.NoError(err) + aggBlock, err := nodes[0].Store.LoadBlock(h) + require.NoError(err) + assert.Equal(aggBlock, nodeBlock) + } + } } -func createNodes(num int, t *testing.T) ([]*Node, *mocks.Application) { +func createNodes(num int, t *testing.T) ([]*Node, []*mocks.Application) { t.Helper() // create keys first, as they are required for P2P connections @@ -59,16 +152,19 @@ func createNodes(num int, t *testing.T) ([]*Node, *mocks.Application) { } nodes := make([]*Node, num) - var aggApp *mocks.Application - nodes[0], aggApp = createNode(0, true, keys, t) + apps := make([]*mocks.Application, num) + dalc := &mockda.MockDataAvailabilityLayerClient{} + _ = dalc.Init(nil, nil, log.TestingLogger()) + _ = dalc.Start() + nodes[0], apps[0] = createNode(0, true, dalc, keys, t) for i := 1; i < num; i++ { - nodes[i], _ = createNode(i, false, keys, t) + nodes[i], apps[i] = createNode(i, false, dalc, keys, t) } - return nodes, aggApp + return nodes, apps } -func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*Node, *mocks.Application) { +func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, t *testing.T) (*Node, *mocks.Application) { t.Helper() require := require.New(t) // nodes will listen on consecutive ports on local interface @@ -77,8 +173,8 @@ func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*N p2pConfig := config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n), } - aggConfig := config.AggregatorConfig{ - BlockTime: 200 * time.Millisecond, + bmConfig := config.BlockManagerConfig{ + BlockTime: 1 * time.Second, NamespaceID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1}, } for i := 0; i < len(keys); i++ { @@ -94,20 +190,18 @@ func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*N app := &mocks.Application{} app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) - if aggregator { - app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) - app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) - app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) - } + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) + app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) node, err := NewNode( context.Background(), config.NodeConfig{ - P2P: p2pConfig, - DALayer: "mock", - Aggregator: aggregator, - AggregatorConfig: aggConfig, + P2P: p2pConfig, + DALayer: "mock", + Aggregator: aggregator, + BlockManagerConfig: bmConfig, }, keys[n], proxy.NewLocalClientCreator(app), @@ -116,5 +210,10 @@ func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*N require.NoError(err) require.NotNil(node) + // use same, common DALC, so nodes can share data + node.dalc = dalc + node.blockManager.SetDALC(dalc) + return node, app } + diff --git a/node/node.go b/node/node.go index 3a4b088b2e5..5a5fc5fd7b1 100644 --- a/node/node.go +++ b/node/node.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" + "github.com/celestiaorg/optimint/block" + "github.com/libp2p/go-libp2p-core/crypto" abci "github.com/tendermint/tendermint/abci/types" llcfg "github.com/tendermint/tendermint/config" @@ -12,7 +14,7 @@ import ( "github.com/tendermint/tendermint/libs/service" corep2p "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" - lltypes "github.com/tendermint/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" "go.uber.org/multierr" "github.com/celestiaorg/optimint/config" @@ -34,10 +36,10 @@ var ( // It connects all the components and orchestrates their work. type Node struct { service.BaseService - eventBus *lltypes.EventBus + eventBus *tmtypes.EventBus proxyApp proxy.AppConns - genesis *lltypes.GenesisDoc + genesis *tmtypes.GenesisDoc conf config.NodeConfig P2P *p2p.Client @@ -49,9 +51,9 @@ type Node struct { incomingHeaderCh chan *p2p.GossipMessage - Store store.Store - aggregator *aggregator - dalc da.DataAvailabilityLayerClient + Store store.Store + blockManager *block.Manager + dalc da.DataAvailabilityLayerClient // keep context here only because of API compatibility // - it's used in `OnStart` (defined in service.Service interface) @@ -59,14 +61,14 @@ type Node struct { } // NewNode creates new Optimint node. -func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey, clientCreator proxy.ClientCreator, genesis *lltypes.GenesisDoc, logger log.Logger) (*Node, error) { +func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey, clientCreator proxy.ClientCreator, genesis *tmtypes.GenesisDoc, logger log.Logger) (*Node, error) { proxyApp := proxy.NewAppConns(clientCreator) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %w", err) } - eventBus := lltypes.NewEventBus() + eventBus := tmtypes.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) if err := eventBus.Start(); err != nil { return nil, err @@ -105,12 +107,9 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey client.SetTxValidator(txValidator) client.SetHeaderValidator(newHeaderValidator(logger)) - var aggregator *aggregator = nil - if conf.Aggregator { - aggregator, err = newAggregator(nodeKey, conf.AggregatorConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "aggregator")) - if err != nil { - return nil, fmt.Errorf("aggregator initialization error: %w", err) - } + blockManager, err := block.NewManager(nodeKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "BlockManager")) + if err != nil { + return nil, fmt.Errorf("BlockManager initialization error: %w", err) } node := &Node{ @@ -119,7 +118,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey genesis: genesis, conf: conf, P2P: client, - aggregator: aggregator, + blockManager: blockManager, dalc: dalc, Mempool: mp, mempoolIDs: mpIDs, @@ -128,6 +127,10 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey Store: s, ctx: ctx, } + node.P2P.SetHeaderHandler(func(msg *p2p.GossipMessage) { + node.incomingHeaderCh <- msg + }) + node.BaseService = *service.NewBaseService(logger, "Node", node) return node, nil @@ -143,22 +146,7 @@ func (n *Node) headerReadLoop(ctx context.Context) { n.Logger.Error("failed to deserialize header", "error", err) continue } - // header is already validated during libp2p pubsub validation phase - hash := header.Hash() - n.Logger.Debug("header details", "height", header.Height, "hash", hash, "lastHeaderHash", header.LastHeaderHash) - - // TODO(tzdybal): this is simplified version for MVP - blocks are fetched only via DALC - blockRes := n.dalc.(da.BlockRetriever).RetrieveBlock(header.Height) - var block *types.Block - switch blockRes.Code { - case da.StatusSuccess: - block = blockRes.Block - case da.StatusError: - case da.StatusTimeout: - default: - } - // TODO(tzdybal): actually apply block - n.Logger.Debug("full block", "block", block) + n.blockManager.HeaderInCh <- &header case <-ctx.Done(): break } @@ -168,7 +156,7 @@ func (n *Node) headerReadLoop(ctx context.Context) { func (n *Node) headerPublishLoop(ctx context.Context) { for { select { - case header := <-n.aggregator.headerCh: + case header := <-n.blockManager.HeaderOutCh: headerBytes, err := header.MarshalBinary() if err != nil { n.Logger.Error("failed to serialize block header", "error", err) @@ -195,12 +183,12 @@ func (n *Node) OnStart() error { return fmt.Errorf("error while starting data availability layer client: %w", err) } if n.conf.Aggregator { - go n.aggregator.aggregationLoop(n.ctx) + go n.blockManager.AggregationLoop(n.ctx) go n.headerPublishLoop(n.ctx) } - n.P2P.SetHeaderHandler(func(header *p2p.GossipMessage) { - n.incomingHeaderCh <- header - }) + go n.blockManager.RetrieveLoop(n.ctx) + go n.blockManager.SyncLoop(n.ctx) + go n.headerReadLoop(n.ctx) return nil @@ -229,7 +217,7 @@ func (n *Node) GetLogger() log.Logger { } // EventBus gives access to Node's event bus. -func (n *Node) EventBus() *lltypes.EventBus { +func (n *Node) EventBus() *tmtypes.EventBus { return n.eventBus } diff --git a/state/executor.go b/state/executor.go index 55280ee2fcd..9525d6f8d21 100644 --- a/state/executor.go +++ b/state/executor.go @@ -9,7 +9,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" "github.com/tendermint/tendermint/proxy" - lltypes "github.com/tendermint/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" abciconv "github.com/celestiaorg/optimint/conv/abci" "github.com/celestiaorg/optimint/log" @@ -108,7 +108,7 @@ func (e *BlockExecutor) updateState(state State, block *types.Block, abciRespons InitialHeight: state.InitialHeight, LastBlockHeight: int64(block.Header.Height), LastBlockTime: time.Unix(int64(block.Header.Time), 0), - LastBlockID: lltypes.BlockID{ + LastBlockID: tmtypes.BlockID{ Hash: hash[:], // for now, we don't care about part set headers }, @@ -116,7 +116,7 @@ func (e *BlockExecutor) updateState(state State, block *types.Block, abciRespons ConsensusParams: state.ConsensusParams, LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged, } - copy(s.LastResultsHash[:], lltypes.NewResults(abciResponses.DeliverTxs).Hash()) + copy(s.LastResultsHash[:], tmtypes.NewResults(abciResponses.DeliverTxs).Hash()) return s, nil } @@ -229,7 +229,7 @@ func (e *BlockExecutor) execute(ctx context.Context, state State, block *types.B return abciResponses, nil } -func toOptimintTxs(txs lltypes.Txs) types.Txs { +func toOptimintTxs(txs tmtypes.Txs) types.Txs { optiTxs := make(types.Txs, len(txs)) for i := range txs { optiTxs[i] = []byte(txs[i]) @@ -237,8 +237,8 @@ func toOptimintTxs(txs lltypes.Txs) types.Txs { return optiTxs } -func fromOptimintTxs(optiTxs types.Txs) lltypes.Txs { - txs := make(lltypes.Txs, len(optiTxs)) +func fromOptimintTxs(optiTxs types.Txs) tmtypes.Txs { + txs := make(tmtypes.Txs, len(optiTxs)) for i := range optiTxs { txs[i] = []byte(optiTxs[i]) }