From ea0be5c994f276bf1952aa1063eabeb42f691686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Wed, 6 Oct 2021 12:36:02 +0200 Subject: [PATCH 1/9] Block sync * aggregator renamed to blockManager * blocks are passed to blockManager after gossiping * DALC(BlockRetriever) used to fetch block data * more refactoring needed * more tests needed --- config/config.go | 6 +- da/mock/mock.go | 17 ++++- node/aggregator.go | 140 ++++++++++++++++++++++++++++++++++----- node/aggregator_test.go | 27 +++++--- node/integration_test.go | 33 +++++---- node/node.go | 49 +++++--------- 6 files changed, 198 insertions(+), 74 deletions(-) diff --git a/config/config.go b/config/config.go index 21a08b33f2..0a3bc2d210 100644 --- a/config/config.go +++ b/config/config.go @@ -8,13 +8,13 @@ type NodeConfig struct { DBPath string P2P P2PConfig Aggregator bool - AggregatorConfig + BlockManagerConfig DALayer string DAConfig []byte } -// AggregatorConfig consists of all parameters required by Aggregator. -type AggregatorConfig struct { +// BlockManagerConfig consists of all parameters required by BlockManagerConfig +type BlockManagerConfig struct { BlockTime time.Duration NamespaceID [8]byte } diff --git a/da/mock/mock.go b/da/mock/mock.go index 1c706f5588..b4516d83ab 100644 --- a/da/mock/mock.go +++ b/da/mock/mock.go @@ -1,6 +1,8 @@ package mock import ( + "sync" + "github.com/celestiaorg/optimint/da" "github.com/celestiaorg/optimint/log" "github.com/celestiaorg/optimint/store" @@ -14,6 +16,8 @@ type MockDataAvailabilityLayerClient struct { Blocks map[[32]byte]*types.Block BlockIndex map[uint64][32]byte + + mtx sync.Mutex } var _ da.DataAvailabilityLayerClient = &MockDataAvailabilityLayerClient{} @@ -21,6 +25,8 @@ var _ da.BlockRetriever = &MockDataAvailabilityLayerClient{} // Init is called once to allow DA client to read configuration and initialize resources. func (m *MockDataAvailabilityLayerClient) Init(config []byte, kvStore store.KVStore, logger log.Logger) error { + m.mtx.Lock() + defer m.mtx.Unlock() m.logger = logger m.Blocks = make(map[[32]byte]*types.Block) m.BlockIndex = make(map[uint64][32]byte) @@ -43,6 +49,8 @@ func (m *MockDataAvailabilityLayerClient) Stop() error { // This should create a transaction which (potentially) // triggers a state transition in the DA layer. func (m *MockDataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultSubmitBlock { + m.mtx.Lock() + defer m.mtx.Unlock() m.logger.Debug("Submitting block to DA layer!", "height", block.Header.Height) hash := block.Header.Hash() @@ -59,12 +67,19 @@ func (m *MockDataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.Res // CheckBlockAvailability queries DA layer to check data availability of block corresponding to given header. func (m *MockDataAvailabilityLayerClient) CheckBlockAvailability(header *types.Header) da.ResultCheckBlock { + m.mtx.Lock() + defer m.mtx.Unlock() _, ok := m.Blocks[header.Hash()] return da.ResultCheckBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, DataAvailable: ok} } // RetrieveBlock returns block at given height from data availability layer. func (m *MockDataAvailabilityLayerClient) RetrieveBlock(height uint64) da.ResultRetrieveBlock { - hash := m.BlockIndex[height] + m.mtx.Lock() + defer m.mtx.Unlock() + hash, ok := m.BlockIndex[height] + if !ok { + return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError}} + } return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusSuccess}, Block: m.Blocks[hash]} } diff --git a/node/aggregator.go b/node/aggregator.go index 9e94ed1145..4c37a4b6eb 100644 --- a/node/aggregator.go +++ b/node/aggregator.go @@ -18,20 +18,28 @@ import ( "github.com/celestiaorg/optimint/types" ) -// aggregator is responsible for aggregating transactions into blocks. -type aggregator struct { +// blockManager is responsible for aggregating transactions into blocks. +type blockManager struct { lastState state.State - conf config.AggregatorConfig + conf config.BlockManagerConfig genesis *lltypes.GenesisDoc proposerKey crypto.PrivKey store store.Store - dalc da.DataAvailabilityLayerClient executor *state.BlockExecutor - headerCh chan *types.Header + dalc da.DataAvailabilityLayerClient + retriever da.BlockRetriever + + headerOutCh chan *types.Header + headerInCh chan *types.Header + + syncTarget uint64 + blockInCh chan *types.Block + retrieveCh chan uint64 + syncCache map[uint64]*types.Block logger log.Logger } @@ -45,16 +53,16 @@ func getInitialState(store store.Store, genesis *lltypes.GenesisDoc) (state.Stat return s, err } -func newAggregator( +func newBlockManager( proposerKey crypto.PrivKey, - conf config.AggregatorConfig, + conf config.BlockManagerConfig, genesis *lltypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConnConsensus, dalc da.DataAvailabilityLayerClient, logger log.Logger, -) (*aggregator, error) { +) (*blockManager, error) { s, err := getInitialState(store, genesis) if err != nil { return nil, err @@ -67,7 +75,7 @@ func newAggregator( exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, mempool, proxyApp, logger) - agg := &aggregator{ + agg := &blockManager{ proposerKey: proposerKey, conf: conf, genesis: genesis, @@ -75,14 +83,19 @@ func newAggregator( store: store, executor: exec, dalc: dalc, - headerCh: make(chan *types.Header), + retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) + headerOutCh: make(chan *types.Header), + headerInCh: make(chan *types.Header), + blockInCh: make(chan *types.Block), + retrieveCh: make(chan uint64), + syncCache: make(map[uint64]*types.Block), logger: logger, } return agg, nil } -func (a *aggregator) aggregationLoop(ctx context.Context) { +func (a *blockManager) aggregationLoop(ctx context.Context) { timer := time.NewTimer(a.conf.BlockTime) for { select { @@ -99,7 +112,100 @@ func (a *aggregator) aggregationLoop(ctx context.Context) { } } -func (a *aggregator) getRemainingSleep(start time.Time) time.Duration { +func (a *blockManager) syncLoop(ctx context.Context) { + for { + select { + case header := <-a.headerInCh: + a.logger.Debug("block header received", "height", header.Height, "hash", header.Hash()) + newHeight := header.Height + currentHeight := a.store.Height() + if newHeight > currentHeight { + // TODO(tzdybal): syncTarget should be atomic + a.syncTarget = newHeight + a.retrieveCh <- newHeight + } + case block := <-a.blockInCh: + a.logger.Debug("block body retrieved from DALC", + "height", block.Header.Height, + "hash", block.Hash(), + ) + a.syncCache[block.Header.Height] = block + currentHeight := a.store.Height() // TODO(tzdybal): maybe store a copy in memory + b1, ok1 := a.syncCache[currentHeight+1] + b2, ok2 := a.syncCache[currentHeight+2] + if ok1 && ok2 { + newState, _, err := a.executor.ApplyBlock(ctx, a.lastState, b1) + if err != nil { + a.logger.Error("failed to ApplyBlock", "error", err) + continue + } + err = a.store.SaveBlock(b1, &b2.LastCommit) + if err != nil { + a.logger.Error("failed to save block", "error", err) + continue + } + + a.lastState = newState + err = a.store.UpdateState(a.lastState) + if err != nil { + a.logger.Error("failed to save updated state", "error", err) + continue + } + delete(a.syncCache, currentHeight+1) + } + case <-ctx.Done(): + return + } + } +} + +func (a *blockManager) retrieveLoop(ctx context.Context) { + for { + select { + case _ = <-a.retrieveCh: + // TODO(tzdybal): syncTarget should be atomic + for h := a.store.Height() + 1; h <= a.syncTarget; h++ { + a.logger.Debug("trying to retrieve block from DALC", "height", h) + a.mustRetrieveBlock(ctx, h) + } + case <-ctx.Done(): + return + } + } +} + +func (a *blockManager) mustRetrieveBlock(ctx context.Context, height uint64) { + // TOOD(tzdybal): extract configuration option + maxRetries := 10 + + for r := 0; r < maxRetries; r++ { + err := a.fetchBlock(ctx, height) + if err == nil { + return + } + // TODO(tzdybal): configuration option + // TODO(tzdybal): exponential backoff + time.Sleep(100 * time.Millisecond) + } + // TODO(tzdybal): this is only temporary solution, for MVP + panic("failed to retrieve block with DALC") +} + +func (a *blockManager) fetchBlock(ctx context.Context, height uint64) error { + var err error + blockRes := a.retriever.RetrieveBlock(height) + switch blockRes.Code { + case da.StatusSuccess: + a.blockInCh <- blockRes.Block + case da.StatusError: + err = fmt.Errorf("failed to retrieve block: %s", blockRes.Message) + case da.StatusTimeout: + err = fmt.Errorf("timeout during retrieve block: %s", blockRes.Message) + } + return err +} + +func (a *blockManager) getRemainingSleep(start time.Time) time.Duration { publishingDuration := time.Since(start) sleepDuration := a.conf.BlockTime - publishingDuration if sleepDuration < 0 { @@ -108,9 +214,7 @@ func (a *aggregator) getRemainingSleep(start time.Time) time.Duration { return sleepDuration } -func (a *aggregator) publishBlock(ctx context.Context) error { - a.logger.Info("Creating and publishing block", "height", a.store.Height()) - +func (a *blockManager) publishBlock(ctx context.Context) error { var lastCommit *types.Commit var err error newHeight := a.store.Height() + 1 @@ -125,6 +229,8 @@ func (a *aggregator) publishBlock(ctx context.Context) error { } } + a.logger.Info("Creating and publishing block", "height", newHeight) + block := a.executor.CreateBlock(newHeight, lastCommit, a.lastState) a.logger.Debug("block info", "num_tx", len(block.Data.Txs)) newState, _, err := a.executor.ApplyBlock(ctx, a.lastState, block) @@ -160,13 +266,13 @@ func (a *aggregator) publishBlock(ctx context.Context) error { return a.broadcastBlock(ctx, block) } -func (a *aggregator) broadcastBlock(ctx context.Context, block *types.Block) error { +func (a *blockManager) broadcastBlock(ctx context.Context, block *types.Block) error { res := a.dalc.SubmitBlock(block) if res.Code != da.StatusSuccess { return fmt.Errorf("DA layer submission failed: %s", res.Message) } - a.headerCh <- &block.Header + a.headerOutCh <- &block.Header return nil } diff --git a/node/aggregator_test.go b/node/aggregator_test.go index df12578e55..2610aa8b21 100644 --- a/node/aggregator_test.go +++ b/node/aggregator_test.go @@ -7,17 +7,19 @@ import ( "testing" "time" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" "github.com/celestiaorg/optimint/config" + "github.com/celestiaorg/optimint/da" + mockda "github.com/celestiaorg/optimint/da/mock" "github.com/celestiaorg/optimint/mocks" "github.com/celestiaorg/optimint/p2p" "github.com/celestiaorg/optimint/state" @@ -38,11 +40,11 @@ func TestAggregatorMode(t *testing.T) { key, _, _ := crypto.GenerateEd25519Key(rand.Reader) anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - aggregatorConfig := config.AggregatorConfig{ + blockManagerConfig := config.BlockManagerConfig{ BlockTime: 500 * time.Millisecond, NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, } - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, AggregatorConfig: aggregatorConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -118,7 +120,7 @@ func TestInitialState(t *testing.T) { } key, _, _ := crypto.GenerateEd25519Key(rand.Reader) - conf := config.AggregatorConfig{ + conf := config.BlockManagerConfig{ BlockTime: 10 * time.Second, NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, } @@ -126,7 +128,9 @@ func TestInitialState(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { assert := assert.New(t) - agg, err := newAggregator(key, conf, c.genesis, c.store, nil, nil, nil, log.TestingLogger()) + logger := log.TestingLogger() + dalc := getMockDALC(logger) + agg, err := newBlockManager(key, conf, c.genesis, c.store, nil, nil, dalc, logger) assert.NoError(err) assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.lastState.ChainID) @@ -135,3 +139,10 @@ func TestInitialState(t *testing.T) { }) } } + +func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient { + dalc := &mockda.MockDataAvailabilityLayerClient{} + dalc.Init(nil, nil, logger) + dalc.Start() + return dalc +} diff --git a/node/integration_test.go b/node/integration_test.go index 3cbcca6b72..7d203972df 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/tendermint/tendermint/types" "github.com/celestiaorg/optimint/config" + "github.com/celestiaorg/optimint/da" "github.com/celestiaorg/optimint/mocks" ) @@ -60,15 +61,16 @@ func createNodes(num int, t *testing.T) ([]*Node, *mocks.Application) { nodes := make([]*Node, num) var aggApp *mocks.Application - nodes[0], aggApp = createNode(0, true, keys, t) + dalc := getMockDALC(log.TestingLogger()) + nodes[0], aggApp = createNode(0, true, dalc, keys, t) for i := 1; i < num; i++ { - nodes[i], _ = createNode(i, false, keys, t) + nodes[i], _ = createNode(i, false, dalc, keys, t) } return nodes, aggApp } -func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*Node, *mocks.Application) { +func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, t *testing.T) (*Node, *mocks.Application) { t.Helper() require := require.New(t) // nodes will listen on consecutive ports on local interface @@ -77,7 +79,7 @@ func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*N p2pConfig := config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n), } - aggConfig := config.AggregatorConfig{ + bmConfig := config.BlockManagerConfig{ BlockTime: 200 * time.Millisecond, NamespaceID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1}, } @@ -94,20 +96,18 @@ func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*N app := &mocks.Application{} app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) - if aggregator { - app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) - app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) - app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) - } + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) + app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) node, err := NewNode( context.Background(), config.NodeConfig{ - P2P: p2pConfig, - DALayer: "mock", - Aggregator: aggregator, - AggregatorConfig: aggConfig, + P2P: p2pConfig, + DALayer: "mock", + Aggregator: aggregator, + BlockManagerConfig: bmConfig, }, keys[n], proxy.NewLocalClientCreator(app), @@ -116,5 +116,10 @@ func createNode(n int, aggregator bool, keys []crypto.PrivKey, t *testing.T) (*N require.NoError(err) require.NotNil(node) + // use same, common DALC, so nodes can share data + node.dalc = dalc + node.blockManager.dalc = dalc + node.blockManager.retriever = dalc.(da.BlockRetriever) + return node, app } diff --git a/node/node.go b/node/node.go index 3a4b088b2e..f98cbbbfa1 100644 --- a/node/node.go +++ b/node/node.go @@ -49,9 +49,9 @@ type Node struct { incomingHeaderCh chan *p2p.GossipMessage - Store store.Store - aggregator *aggregator - dalc da.DataAvailabilityLayerClient + Store store.Store + blockManager *blockManager + dalc da.DataAvailabilityLayerClient // keep context here only because of API compatibility // - it's used in `OnStart` (defined in service.Service interface) @@ -105,12 +105,9 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey client.SetTxValidator(txValidator) client.SetHeaderValidator(newHeaderValidator(logger)) - var aggregator *aggregator = nil - if conf.Aggregator { - aggregator, err = newAggregator(nodeKey, conf.AggregatorConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "aggregator")) - if err != nil { - return nil, fmt.Errorf("aggregator initialization error: %w", err) - } + blockManager, err := newBlockManager(nodeKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "BlockManager")) + if err != nil { + return nil, fmt.Errorf("BlockManager initialization error: %w", err) } node := &Node{ @@ -119,7 +116,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey genesis: genesis, conf: conf, P2P: client, - aggregator: aggregator, + blockManager: blockManager, dalc: dalc, Mempool: mp, mempoolIDs: mpIDs, @@ -128,6 +125,10 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey Store: s, ctx: ctx, } + node.P2P.SetHeaderHandler(func(msg *p2p.GossipMessage) { + node.incomingHeaderCh <- msg + }) + node.BaseService = *service.NewBaseService(logger, "Node", node) return node, nil @@ -137,28 +138,14 @@ func (n *Node) headerReadLoop(ctx context.Context) { for { select { case headerMsg := <-n.incomingHeaderCh: + n.Logger.Info("tzdybal") var header types.Header err := header.UnmarshalBinary(headerMsg.Data) if err != nil { n.Logger.Error("failed to deserialize header", "error", err) continue } - // header is already validated during libp2p pubsub validation phase - hash := header.Hash() - n.Logger.Debug("header details", "height", header.Height, "hash", hash, "lastHeaderHash", header.LastHeaderHash) - - // TODO(tzdybal): this is simplified version for MVP - blocks are fetched only via DALC - blockRes := n.dalc.(da.BlockRetriever).RetrieveBlock(header.Height) - var block *types.Block - switch blockRes.Code { - case da.StatusSuccess: - block = blockRes.Block - case da.StatusError: - case da.StatusTimeout: - default: - } - // TODO(tzdybal): actually apply block - n.Logger.Debug("full block", "block", block) + n.blockManager.headerInCh <- &header case <-ctx.Done(): break } @@ -168,7 +155,7 @@ func (n *Node) headerReadLoop(ctx context.Context) { func (n *Node) headerPublishLoop(ctx context.Context) { for { select { - case header := <-n.aggregator.headerCh: + case header := <-n.blockManager.headerOutCh: headerBytes, err := header.MarshalBinary() if err != nil { n.Logger.Error("failed to serialize block header", "error", err) @@ -195,12 +182,12 @@ func (n *Node) OnStart() error { return fmt.Errorf("error while starting data availability layer client: %w", err) } if n.conf.Aggregator { - go n.aggregator.aggregationLoop(n.ctx) + go n.blockManager.aggregationLoop(n.ctx) go n.headerPublishLoop(n.ctx) } - n.P2P.SetHeaderHandler(func(header *p2p.GossipMessage) { - n.incomingHeaderCh <- header - }) + go n.blockManager.retrieveLoop(n.ctx) + go n.blockManager.syncLoop(n.ctx) + go n.headerReadLoop(n.ctx) return nil From b81400c520eeaef952f54446655c1d4f40cd8f66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Wed, 6 Oct 2021 13:16:50 +0200 Subject: [PATCH 2/9] lint --- node/aggregator.go | 2 +- node/aggregator_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/node/aggregator.go b/node/aggregator.go index 4c37a4b6eb..c1421799c7 100644 --- a/node/aggregator.go +++ b/node/aggregator.go @@ -162,7 +162,7 @@ func (a *blockManager) syncLoop(ctx context.Context) { func (a *blockManager) retrieveLoop(ctx context.Context) { for { select { - case _ = <-a.retrieveCh: + case <-a.retrieveCh: // TODO(tzdybal): syncTarget should be atomic for h := a.store.Height() + 1; h <= a.syncTarget; h++ { a.logger.Debug("trying to retrieve block from DALC", "height", h) diff --git a/node/aggregator_test.go b/node/aggregator_test.go index 2610aa8b21..358f36e505 100644 --- a/node/aggregator_test.go +++ b/node/aggregator_test.go @@ -142,7 +142,7 @@ func TestInitialState(t *testing.T) { func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient { dalc := &mockda.MockDataAvailabilityLayerClient{} - dalc.Init(nil, nil, logger) - dalc.Start() + _ = dalc.Init(nil, nil, logger) + _ = dalc.Start() return dalc } From 56723caca87020f80f627c67e4b16017fb4a216c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Wed, 6 Oct 2021 13:26:28 +0200 Subject: [PATCH 3/9] make syncTarget atomic --- node/aggregator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/aggregator.go b/node/aggregator.go index c1421799c7..4b15e2e34c 100644 --- a/node/aggregator.go +++ b/node/aggregator.go @@ -3,6 +3,7 @@ package node import ( "context" "fmt" + "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/crypto" @@ -120,8 +121,7 @@ func (a *blockManager) syncLoop(ctx context.Context) { newHeight := header.Height currentHeight := a.store.Height() if newHeight > currentHeight { - // TODO(tzdybal): syncTarget should be atomic - a.syncTarget = newHeight + atomic.StoreUint64(&a.syncTarget, newHeight) a.retrieveCh <- newHeight } case block := <-a.blockInCh: @@ -163,8 +163,8 @@ func (a *blockManager) retrieveLoop(ctx context.Context) { for { select { case <-a.retrieveCh: - // TODO(tzdybal): syncTarget should be atomic - for h := a.store.Height() + 1; h <= a.syncTarget; h++ { + target := atomic.LoadUint64(&a.syncTarget) + for h := a.store.Height() + 1; h <= target; h++ { a.logger.Debug("trying to retrieve block from DALC", "height", h) a.mustRetrieveBlock(ctx, h) } From aaa7809cf39523f5ce5fee7a778f5562c6571d54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 7 Oct 2021 13:47:26 +0200 Subject: [PATCH 4/9] Finish refactoring aggregator to block.Manager --- node/aggregator.go => block/manager.go | 135 +++++++++--------- .../manager_test.go | 62 +------- node/integration_test.go | 61 +++++++- node/node.go | 15 +- 4 files changed, 138 insertions(+), 135 deletions(-) rename node/aggregator.go => block/manager.go (57%) rename node/aggregator_test.go => block/manager_test.go (55%) diff --git a/node/aggregator.go b/block/manager.go similarity index 57% rename from node/aggregator.go rename to block/manager.go index 4b15e2e34c..f4953ac3ad 100644 --- a/node/aggregator.go +++ b/block/manager.go @@ -1,4 +1,4 @@ -package node +package block import ( "context" @@ -19,8 +19,8 @@ import ( "github.com/celestiaorg/optimint/types" ) -// blockManager is responsible for aggregating transactions into blocks. -type blockManager struct { +// Manager is responsible for aggregating transactions into blocks. +type Manager struct { lastState state.State conf config.BlockManagerConfig @@ -34,8 +34,8 @@ type blockManager struct { dalc da.DataAvailabilityLayerClient retriever da.BlockRetriever - headerOutCh chan *types.Header - headerInCh chan *types.Header + HeaderOutCh chan *types.Header + HeaderInCh chan *types.Header syncTarget uint64 blockInCh chan *types.Block @@ -54,7 +54,7 @@ func getInitialState(store store.Store, genesis *lltypes.GenesisDoc) (state.Stat return s, err } -func newBlockManager( +func NewManager( proposerKey crypto.PrivKey, conf config.BlockManagerConfig, genesis *lltypes.GenesisDoc, @@ -63,7 +63,7 @@ func newBlockManager( proxyApp proxy.AppConnConsensus, dalc da.DataAvailabilityLayerClient, logger log.Logger, -) (*blockManager, error) { +) (*Manager, error) { s, err := getInitialState(store, genesis) if err != nil { return nil, err @@ -76,7 +76,7 @@ func newBlockManager( exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, mempool, proxyApp, logger) - agg := &blockManager{ + agg := &Manager{ proposerKey: proposerKey, conf: conf, genesis: genesis, @@ -85,8 +85,8 @@ func newBlockManager( executor: exec, dalc: dalc, retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) - headerOutCh: make(chan *types.Header), - headerInCh: make(chan *types.Header), + HeaderOutCh: make(chan *types.Header), + HeaderInCh: make(chan *types.Header), blockInCh: make(chan *types.Block), retrieveCh: make(chan uint64), syncCache: make(map[uint64]*types.Block), @@ -96,62 +96,67 @@ func newBlockManager( return agg, nil } -func (a *blockManager) aggregationLoop(ctx context.Context) { - timer := time.NewTimer(a.conf.BlockTime) +func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) { + m.dalc = dalc + m.retriever = dalc.(da.BlockRetriever) +} + +func (m *Manager) AggregationLoop(ctx context.Context) { + timer := time.NewTimer(m.conf.BlockTime) for { select { case <-ctx.Done(): return case <-timer.C: start := time.Now() - err := a.publishBlock(ctx) + err := m.publishBlock(ctx) if err != nil { - a.logger.Error("error while publishing block", "error", err) + m.logger.Error("error while publishing block", "error", err) } - timer.Reset(a.getRemainingSleep(start)) + timer.Reset(m.getRemainingSleep(start)) } } } -func (a *blockManager) syncLoop(ctx context.Context) { +func (m *Manager) SyncLoop(ctx context.Context) { for { select { - case header := <-a.headerInCh: - a.logger.Debug("block header received", "height", header.Height, "hash", header.Hash()) + case header := <-m.HeaderInCh: + m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash()) newHeight := header.Height - currentHeight := a.store.Height() + currentHeight := m.store.Height() if newHeight > currentHeight { - atomic.StoreUint64(&a.syncTarget, newHeight) - a.retrieveCh <- newHeight + atomic.StoreUint64(&m.syncTarget, newHeight) + m.retrieveCh <- newHeight } - case block := <-a.blockInCh: - a.logger.Debug("block body retrieved from DALC", + case block := <-m.blockInCh: + m.logger.Debug("block body retrieved from DALC", "height", block.Header.Height, "hash", block.Hash(), ) - a.syncCache[block.Header.Height] = block - currentHeight := a.store.Height() // TODO(tzdybal): maybe store a copy in memory - b1, ok1 := a.syncCache[currentHeight+1] - b2, ok2 := a.syncCache[currentHeight+2] + m.syncCache[block.Header.Height] = block + currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory + b1, ok1 := m.syncCache[currentHeight+1] + b2, ok2 := m.syncCache[currentHeight+2] if ok1 && ok2 { - newState, _, err := a.executor.ApplyBlock(ctx, a.lastState, b1) + newState, _, err := m.executor.ApplyBlock(ctx, m.lastState, b1) if err != nil { - a.logger.Error("failed to ApplyBlock", "error", err) + m.logger.Error("failed to ApplyBlock", "error", err) continue } - err = a.store.SaveBlock(b1, &b2.LastCommit) + err = m.store.SaveBlock(b1, &b2.LastCommit) if err != nil { - a.logger.Error("failed to save block", "error", err) + m.logger.Error("failed to save block", "error", err) continue } - a.lastState = newState - err = a.store.UpdateState(a.lastState) + m.lastState = newState + err = m.store.UpdateState(m.lastState) if err != nil { - a.logger.Error("failed to save updated state", "error", err) + m.logger.Error("failed to save updated state", "error", err) continue } - delete(a.syncCache, currentHeight+1) + delete(m.syncCache, currentHeight+1) } case <-ctx.Done(): return @@ -159,14 +164,14 @@ func (a *blockManager) syncLoop(ctx context.Context) { } } -func (a *blockManager) retrieveLoop(ctx context.Context) { +func (m *Manager) RetrieveLoop(ctx context.Context) { for { select { - case <-a.retrieveCh: - target := atomic.LoadUint64(&a.syncTarget) - for h := a.store.Height() + 1; h <= target; h++ { - a.logger.Debug("trying to retrieve block from DALC", "height", h) - a.mustRetrieveBlock(ctx, h) + case <-m.retrieveCh: + target := atomic.LoadUint64(&m.syncTarget) + for h := m.store.Height() + 1; h <= target; h++ { + m.logger.Debug("trying to retrieve block from DALC", "height", h) + m.mustRetrieveBlock(ctx, h) } case <-ctx.Done(): return @@ -174,12 +179,12 @@ func (a *blockManager) retrieveLoop(ctx context.Context) { } } -func (a *blockManager) mustRetrieveBlock(ctx context.Context, height uint64) { +func (m *Manager) mustRetrieveBlock(ctx context.Context, height uint64) { // TOOD(tzdybal): extract configuration option maxRetries := 10 for r := 0; r < maxRetries; r++ { - err := a.fetchBlock(ctx, height) + err := m.fetchBlock(ctx, height) if err == nil { return } @@ -191,12 +196,12 @@ func (a *blockManager) mustRetrieveBlock(ctx context.Context, height uint64) { panic("failed to retrieve block with DALC") } -func (a *blockManager) fetchBlock(ctx context.Context, height uint64) error { +func (m *Manager) fetchBlock(ctx context.Context, height uint64) error { var err error - blockRes := a.retriever.RetrieveBlock(height) + blockRes := m.retriever.RetrieveBlock(height) switch blockRes.Code { case da.StatusSuccess: - a.blockInCh <- blockRes.Block + m.blockInCh <- blockRes.Block case da.StatusError: err = fmt.Errorf("failed to retrieve block: %s", blockRes.Message) case da.StatusTimeout: @@ -205,35 +210,35 @@ func (a *blockManager) fetchBlock(ctx context.Context, height uint64) error { return err } -func (a *blockManager) getRemainingSleep(start time.Time) time.Duration { +func (m *Manager) getRemainingSleep(start time.Time) time.Duration { publishingDuration := time.Since(start) - sleepDuration := a.conf.BlockTime - publishingDuration + sleepDuration := m.conf.BlockTime - publishingDuration if sleepDuration < 0 { sleepDuration = 0 } return sleepDuration } -func (a *blockManager) publishBlock(ctx context.Context) error { +func (m *Manager) publishBlock(ctx context.Context) error { var lastCommit *types.Commit var err error - newHeight := a.store.Height() + 1 + newHeight := m.store.Height() + 1 // this is a special case, when first block is produced - there is no previous commit - if newHeight == uint64(a.genesis.InitialHeight) { - lastCommit = &types.Commit{Height: a.store.Height(), HeaderHash: [32]byte{}} + if newHeight == uint64(m.genesis.InitialHeight) { + lastCommit = &types.Commit{Height: m.store.Height(), HeaderHash: [32]byte{}} } else { - lastCommit, err = a.store.LoadCommit(a.store.Height()) + lastCommit, err = m.store.LoadCommit(m.store.Height()) if err != nil { return fmt.Errorf("error while loading last commit: %w", err) } } - a.logger.Info("Creating and publishing block", "height", newHeight) + m.logger.Info("Creating and publishing block", "height", newHeight) - block := a.executor.CreateBlock(newHeight, lastCommit, a.lastState) - a.logger.Debug("block info", "num_tx", len(block.Data.Txs)) - newState, _, err := a.executor.ApplyBlock(ctx, a.lastState, block) + block := m.executor.CreateBlock(newHeight, lastCommit, m.lastState) + m.logger.Debug("block info", "num_tx", len(block.Data.Txs)) + newState, _, err := m.executor.ApplyBlock(ctx, m.lastState, block) if err != nil { return err } @@ -242,7 +247,7 @@ func (a *blockManager) publishBlock(ctx context.Context) error { if err != nil { return err } - sign, err := a.proposerKey.Sign(headerBytes) + sign, err := m.proposerKey.Sign(headerBytes) if err != nil { return err } @@ -252,27 +257,27 @@ func (a *blockManager) publishBlock(ctx context.Context) error { HeaderHash: block.Header.Hash(), Signatures: []types.Signature{sign}, } - err = a.store.SaveBlock(block, commit) + err = m.store.SaveBlock(block, commit) if err != nil { return err } - a.lastState = newState - err = a.store.UpdateState(a.lastState) + m.lastState = newState + err = m.store.UpdateState(m.lastState) if err != nil { return err } - return a.broadcastBlock(ctx, block) + return m.broadcastBlock(ctx, block) } -func (a *blockManager) broadcastBlock(ctx context.Context, block *types.Block) error { - res := a.dalc.SubmitBlock(block) +func (m *Manager) broadcastBlock(ctx context.Context, block *types.Block) error { + res := m.dalc.SubmitBlock(block) if res.Code != da.StatusSuccess { return fmt.Errorf("DA layer submission failed: %s", res.Message) } - a.headerOutCh <- &block.Header + m.HeaderOutCh <- &block.Header return nil } diff --git a/node/aggregator_test.go b/block/manager_test.go similarity index 55% rename from node/aggregator_test.go rename to block/manager_test.go index 358f36e505..5d37de6f8b 100644 --- a/node/aggregator_test.go +++ b/block/manager_test.go @@ -1,81 +1,23 @@ -package node +package block import ( - "context" "crypto/rand" - mrand "math/rand" "testing" "time" "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" "github.com/celestiaorg/optimint/config" "github.com/celestiaorg/optimint/da" mockda "github.com/celestiaorg/optimint/da/mock" - "github.com/celestiaorg/optimint/mocks" - "github.com/celestiaorg/optimint/p2p" "github.com/celestiaorg/optimint/state" "github.com/celestiaorg/optimint/store" ) -func TestAggregatorMode(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - app := &mocks.Application{} - app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) - app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) - app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) - app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) - - key, _, _ := crypto.GenerateEd25519Key(rand.Reader) - anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - - blockManagerConfig := config.BlockManagerConfig{ - BlockTime: 500 * time.Millisecond, - NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, - } - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) - require.NoError(err) - require.NotNil(node) - - assert.False(node.IsRunning()) - - err = node.Start() - assert.NoError(err) - defer func() { - err := node.Stop() - assert.NoError(err) - }() - assert.True(node.IsRunning()) - - pid, err := peer.IDFromPrivateKey(anotherKey) - require.NoError(err) - ctx, cancel := context.WithCancel(context.TODO()) - go func() { - for { - select { - case <-ctx.Done(): - return - default: - node.incomingTxCh <- &p2p.GossipMessage{Data: []byte(time.Now().String()), From: pid} - time.Sleep(time.Duration(mrand.Uint32()%20) * time.Millisecond) - } - } - }() - time.Sleep(5 * time.Second) - cancel() -} - func TestInitialState(t *testing.T) { genesis := &types.GenesisDoc{ ChainID: "genesis id", @@ -130,7 +72,7 @@ func TestInitialState(t *testing.T) { assert := assert.New(t) logger := log.TestingLogger() dalc := getMockDALC(logger) - agg, err := newBlockManager(key, conf, c.genesis, c.store, nil, nil, dalc, logger) + agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, logger) assert.NoError(err) assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.lastState.ChainID) diff --git a/node/integration_test.go b/node/integration_test.go index 7d203972df..02b74695bb 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -3,6 +3,10 @@ package node import ( "context" "crypto/rand" + mockda "github.com/celestiaorg/optimint/da/mock" + "github.com/celestiaorg/optimint/p2p" + "github.com/stretchr/testify/assert" + mrand "math/rand" "strconv" "strings" "testing" @@ -22,6 +26,56 @@ import ( "github.com/celestiaorg/optimint/mocks" ) +func TestAggregatorMode(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + app := &mocks.Application{} + app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) + + key, _, _ := crypto.GenerateEd25519Key(rand.Reader) + anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) + + blockManagerConfig := config.BlockManagerConfig{ + BlockTime: 500 * time.Millisecond, + NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, + } + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + require.NoError(err) + require.NotNil(node) + + assert.False(node.IsRunning()) + + err = node.Start() + assert.NoError(err) + defer func() { + err := node.Stop() + assert.NoError(err) + }() + assert.True(node.IsRunning()) + + pid, err := peer.IDFromPrivateKey(anotherKey) + require.NoError(err) + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + node.incomingTxCh <- &p2p.GossipMessage{Data: []byte(time.Now().String()), From: pid} + time.Sleep(time.Duration(mrand.Uint32()%20) * time.Millisecond) + } + } + }() + time.Sleep(5 * time.Second) + cancel() +} + // TestTxGossipingAndAggregation setups a network of nodes, with single aggregator and multiple producers. // Nodes should gossip transactions and aggregator node should produce blocks. func TestTxGossipingAndAggregation(t *testing.T) { @@ -61,7 +115,9 @@ func createNodes(num int, t *testing.T) ([]*Node, *mocks.Application) { nodes := make([]*Node, num) var aggApp *mocks.Application - dalc := getMockDALC(log.TestingLogger()) + dalc := &mockda.MockDataAvailabilityLayerClient{} + _ = dalc.Init(nil, nil, log.TestingLogger()) + _ = dalc.Start() nodes[0], aggApp = createNode(0, true, dalc, keys, t) for i := 1; i < num; i++ { nodes[i], _ = createNode(i, false, dalc, keys, t) @@ -118,8 +174,7 @@ func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, key // use same, common DALC, so nodes can share data node.dalc = dalc - node.blockManager.dalc = dalc - node.blockManager.retriever = dalc.(da.BlockRetriever) + node.blockManager.SetDALC(dalc) return node, app } diff --git a/node/node.go b/node/node.go index f98cbbbfa1..a2758a0709 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/celestiaorg/optimint/block" "github.com/libp2p/go-libp2p-core/crypto" abci "github.com/tendermint/tendermint/abci/types" @@ -50,7 +51,7 @@ type Node struct { incomingHeaderCh chan *p2p.GossipMessage Store store.Store - blockManager *blockManager + blockManager *block.Manager dalc da.DataAvailabilityLayerClient // keep context here only because of API compatibility @@ -105,7 +106,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey client.SetTxValidator(txValidator) client.SetHeaderValidator(newHeaderValidator(logger)) - blockManager, err := newBlockManager(nodeKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "BlockManager")) + blockManager, err := block.NewManager(nodeKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "BlockManager")) if err != nil { return nil, fmt.Errorf("BlockManager initialization error: %w", err) } @@ -145,7 +146,7 @@ func (n *Node) headerReadLoop(ctx context.Context) { n.Logger.Error("failed to deserialize header", "error", err) continue } - n.blockManager.headerInCh <- &header + n.blockManager.HeaderInCh <- &header case <-ctx.Done(): break } @@ -155,7 +156,7 @@ func (n *Node) headerReadLoop(ctx context.Context) { func (n *Node) headerPublishLoop(ctx context.Context) { for { select { - case header := <-n.blockManager.headerOutCh: + case header := <-n.blockManager.HeaderOutCh: headerBytes, err := header.MarshalBinary() if err != nil { n.Logger.Error("failed to serialize block header", "error", err) @@ -182,11 +183,11 @@ func (n *Node) OnStart() error { return fmt.Errorf("error while starting data availability layer client: %w", err) } if n.conf.Aggregator { - go n.blockManager.aggregationLoop(n.ctx) + go n.blockManager.AggregationLoop(n.ctx) go n.headerPublishLoop(n.ctx) } - go n.blockManager.retrieveLoop(n.ctx) - go n.blockManager.syncLoop(n.ctx) + go n.blockManager.RetrieveLoop(n.ctx) + go n.blockManager.SyncLoop(n.ctx) go n.headerReadLoop(n.ctx) From a689bad85e295ef187ed54070aaf67d368f27ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 7 Oct 2021 14:26:51 +0200 Subject: [PATCH 5/9] Add blocks related assertions to integration test --- node/integration_test.go | 49 +++++++++++++++++++++++++++++++++++----- node/node.go | 1 - 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/node/integration_test.go b/node/integration_test.go index 02b74695bb..035590d9b3 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -79,9 +79,10 @@ func TestAggregatorMode(t *testing.T) { // TestTxGossipingAndAggregation setups a network of nodes, with single aggregator and multiple producers. // Nodes should gossip transactions and aggregator node should produce blocks. func TestTxGossipingAndAggregation(t *testing.T) { + assert := assert.New(t) require := require.New(t) - nodes, aggApp := createNodes(11, t) + nodes, apps := createNodes(11, t) for _, n := range nodes { require.NoError(n.Start()) @@ -99,12 +100,48 @@ func TestTxGossipingAndAggregation(t *testing.T) { for _, n := range nodes { require.NoError(n.Stop()) } + aggApp := apps[0] + apps = apps[1:] aggApp.AssertNumberOfCalls(t, "DeliverTx", 10) aggApp.AssertExpectations(t) + + for i, app := range apps { + app.AssertNumberOfCalls(t, "DeliverTx", 10) + app.AssertExpectations(t) + + // assert that we have most of the blocks from aggregator + beginCnt := 0 + endCnt := 0 + commitCnt := 0 + for _, call := range app.Calls { + switch call.Method { + case "BeginBlock": + beginCnt++ + case "EndBlock": + endCnt++ + case "Commit": + commitCnt++ + } + } + aggregatorHeight := nodes[0].Store.Height() + adjustedHeight := int(aggregatorHeight - 3) // 3 is completely arbitrary + assert.GreaterOrEqual(beginCnt, adjustedHeight) + assert.GreaterOrEqual(endCnt, adjustedHeight) + assert.GreaterOrEqual(commitCnt, adjustedHeight) + + // assert that all blocks known to node are same as produced by aggregator + for h := uint64(1); h <= nodes[i].Store.Height(); h++ { + nodeBlock, err := nodes[i].Store.LoadBlock(h) + require.NoError(err) + aggBlock, err := nodes[0].Store.LoadBlock(h) + require.NoError(err) + assert.Equal(aggBlock, nodeBlock) + } + } } -func createNodes(num int, t *testing.T) ([]*Node, *mocks.Application) { +func createNodes(num int, t *testing.T) ([]*Node, []*mocks.Application) { t.Helper() // create keys first, as they are required for P2P connections @@ -114,16 +151,16 @@ func createNodes(num int, t *testing.T) ([]*Node, *mocks.Application) { } nodes := make([]*Node, num) - var aggApp *mocks.Application + apps := make([]*mocks.Application, num) dalc := &mockda.MockDataAvailabilityLayerClient{} _ = dalc.Init(nil, nil, log.TestingLogger()) _ = dalc.Start() - nodes[0], aggApp = createNode(0, true, dalc, keys, t) + nodes[0], apps[0] = createNode(0, true, dalc, keys, t) for i := 1; i < num; i++ { - nodes[i], _ = createNode(i, false, dalc, keys, t) + nodes[i], apps[i] = createNode(i, false, dalc, keys, t) } - return nodes, aggApp + return nodes, apps } func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, t *testing.T) (*Node, *mocks.Application) { diff --git a/node/node.go b/node/node.go index a2758a0709..34602a9b06 100644 --- a/node/node.go +++ b/node/node.go @@ -139,7 +139,6 @@ func (n *Node) headerReadLoop(ctx context.Context) { for { select { case headerMsg := <-n.incomingHeaderCh: - n.Logger.Info("tzdybal") var header types.Header err := header.UnmarshalBinary(headerMsg.Data) if err != nil { From 9748ceaf64ce96a50d04cc3b0fb063d9e68e172c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Fri, 8 Oct 2021 22:00:16 +0200 Subject: [PATCH 6/9] Slow down for GitHub actions --- node/integration_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/node/integration_test.go b/node/integration_test.go index 035590d9b3..905eb996fc 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -41,7 +41,7 @@ func TestAggregatorMode(t *testing.T) { anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) blockManagerConfig := config.BlockManagerConfig{ - BlockTime: 500 * time.Millisecond, + BlockTime: 1 * time.Second, NamespaceID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, } node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) @@ -82,7 +82,8 @@ func TestTxGossipingAndAggregation(t *testing.T) { assert := assert.New(t) require := require.New(t) - nodes, apps := createNodes(11, t) + clientNodes := 4 + nodes, apps := createNodes(clientNodes+1, t) for _, n := range nodes { require.NoError(n.Start()) @@ -103,11 +104,11 @@ func TestTxGossipingAndAggregation(t *testing.T) { aggApp := apps[0] apps = apps[1:] - aggApp.AssertNumberOfCalls(t, "DeliverTx", 10) + aggApp.AssertNumberOfCalls(t, "DeliverTx", clientNodes) aggApp.AssertExpectations(t) for i, app := range apps { - app.AssertNumberOfCalls(t, "DeliverTx", 10) + app.AssertNumberOfCalls(t, "DeliverTx", clientNodes) app.AssertExpectations(t) // assert that we have most of the blocks from aggregator @@ -173,7 +174,7 @@ func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, key ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n), } bmConfig := config.BlockManagerConfig{ - BlockTime: 200 * time.Millisecond, + BlockTime: 1 * time.Second, NamespaceID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1}, } for i := 0; i < len(keys); i++ { @@ -215,3 +216,4 @@ func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, key return node, app } + From ea71ca6636d17bd5053791bd01c7a6ecf6093b4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 11 Oct 2021 21:29:53 +0200 Subject: [PATCH 7/9] lltypes -> tmtypes --- block/manager.go | 8 ++++---- node/node.go | 13 +++++++------ state/executor.go | 12 ++++++------ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/block/manager.go b/block/manager.go index f4953ac3ad..9fffea79b4 100644 --- a/block/manager.go +++ b/block/manager.go @@ -8,7 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/crypto" "github.com/tendermint/tendermint/proxy" - lltypes "github.com/tendermint/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" "github.com/celestiaorg/optimint/config" "github.com/celestiaorg/optimint/da" @@ -24,7 +24,7 @@ type Manager struct { lastState state.State conf config.BlockManagerConfig - genesis *lltypes.GenesisDoc + genesis *tmtypes.GenesisDoc proposerKey crypto.PrivKey @@ -46,7 +46,7 @@ type Manager struct { } // getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc. -func getInitialState(store store.Store, genesis *lltypes.GenesisDoc) (state.State, error) { +func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc) (state.State, error) { s, err := store.LoadState() if err != nil { s, err = state.NewFromGenesisDoc(genesis) @@ -57,7 +57,7 @@ func getInitialState(store store.Store, genesis *lltypes.GenesisDoc) (state.Stat func NewManager( proposerKey crypto.PrivKey, conf config.BlockManagerConfig, - genesis *lltypes.GenesisDoc, + genesis *tmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConnConsensus, diff --git a/node/node.go b/node/node.go index 34602a9b06..5a5fc5fd7b 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/celestiaorg/optimint/block" "github.com/libp2p/go-libp2p-core/crypto" @@ -13,7 +14,7 @@ import ( "github.com/tendermint/tendermint/libs/service" corep2p "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" - lltypes "github.com/tendermint/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" "go.uber.org/multierr" "github.com/celestiaorg/optimint/config" @@ -35,10 +36,10 @@ var ( // It connects all the components and orchestrates their work. type Node struct { service.BaseService - eventBus *lltypes.EventBus + eventBus *tmtypes.EventBus proxyApp proxy.AppConns - genesis *lltypes.GenesisDoc + genesis *tmtypes.GenesisDoc conf config.NodeConfig P2P *p2p.Client @@ -60,14 +61,14 @@ type Node struct { } // NewNode creates new Optimint node. -func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey, clientCreator proxy.ClientCreator, genesis *lltypes.GenesisDoc, logger log.Logger) (*Node, error) { +func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey, clientCreator proxy.ClientCreator, genesis *tmtypes.GenesisDoc, logger log.Logger) (*Node, error) { proxyApp := proxy.NewAppConns(clientCreator) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %w", err) } - eventBus := lltypes.NewEventBus() + eventBus := tmtypes.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) if err := eventBus.Start(); err != nil { return nil, err @@ -216,7 +217,7 @@ func (n *Node) GetLogger() log.Logger { } // EventBus gives access to Node's event bus. -func (n *Node) EventBus() *lltypes.EventBus { +func (n *Node) EventBus() *tmtypes.EventBus { return n.eventBus } diff --git a/state/executor.go b/state/executor.go index 55280ee2fc..9525d6f8d2 100644 --- a/state/executor.go +++ b/state/executor.go @@ -9,7 +9,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" "github.com/tendermint/tendermint/proxy" - lltypes "github.com/tendermint/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" abciconv "github.com/celestiaorg/optimint/conv/abci" "github.com/celestiaorg/optimint/log" @@ -108,7 +108,7 @@ func (e *BlockExecutor) updateState(state State, block *types.Block, abciRespons InitialHeight: state.InitialHeight, LastBlockHeight: int64(block.Header.Height), LastBlockTime: time.Unix(int64(block.Header.Time), 0), - LastBlockID: lltypes.BlockID{ + LastBlockID: tmtypes.BlockID{ Hash: hash[:], // for now, we don't care about part set headers }, @@ -116,7 +116,7 @@ func (e *BlockExecutor) updateState(state State, block *types.Block, abciRespons ConsensusParams: state.ConsensusParams, LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged, } - copy(s.LastResultsHash[:], lltypes.NewResults(abciResponses.DeliverTxs).Hash()) + copy(s.LastResultsHash[:], tmtypes.NewResults(abciResponses.DeliverTxs).Hash()) return s, nil } @@ -229,7 +229,7 @@ func (e *BlockExecutor) execute(ctx context.Context, state State, block *types.B return abciResponses, nil } -func toOptimintTxs(txs lltypes.Txs) types.Txs { +func toOptimintTxs(txs tmtypes.Txs) types.Txs { optiTxs := make(types.Txs, len(txs)) for i := range txs { optiTxs[i] = []byte(txs[i]) @@ -237,8 +237,8 @@ func toOptimintTxs(txs lltypes.Txs) types.Txs { return optiTxs } -func fromOptimintTxs(optiTxs types.Txs) lltypes.Txs { - txs := make(lltypes.Txs, len(optiTxs)) +func fromOptimintTxs(optiTxs types.Txs) tmtypes.Txs { + txs := make(tmtypes.Txs, len(optiTxs)) for i := range optiTxs { txs[i] = []byte(optiTxs[i]) } From 7f61c8f26274adf5de2f69b3667bd5bcd62943fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 11 Oct 2021 21:35:24 +0200 Subject: [PATCH 8/9] add comment --- block/manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/block/manager.go b/block/manager.go index 9fffea79b4..3f3ab671ff 100644 --- a/block/manager.go +++ b/block/manager.go @@ -125,6 +125,9 @@ func (m *Manager) SyncLoop(ctx context.Context) { m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash()) newHeight := header.Height currentHeight := m.store.Height() + // in case of client reconnecting after being offline + // newHeight may be significantly larger than currentHeight + // it's handled gently in RetrieveLoop if newHeight > currentHeight { atomic.StoreUint64(&m.syncTarget, newHeight) m.retrieveCh <- newHeight From f91bef090d9cac132337f529d15697d7573d05d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 11 Oct 2021 21:35:52 +0200 Subject: [PATCH 9/9] Improve cross-package code coverage with go-acc --- .github/workflows/test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d6abd37e3c..87a079e8a4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,9 @@ jobs: run: go build -v ./... - name: Test & Coverage - run: go test -race -coverprofile=coverage.txt -covermode=atomic -v ./... + run: | + go install github.com/ory/go-acc@v0.2.6 + go-acc -o coverage.txt ./... -- -v --race - uses: codecov/codecov-action@v2.1.0 with: