diff --git a/block/aggregation.go b/block/aggregation.go index 29dcd86610..29e2dd58d8 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -36,7 +36,7 @@ func (m *Manager) AggregationLoop(ctx context.Context) { // Lazy Aggregator mode. // In Lazy Aggregator mode, blocks are built only when there are // transactions or every LazyBlockTime. - if m.config.Node.LazyAggregator { + if m.config.Node.LazyMode { m.lazyAggregationLoop(ctx, blockTimer) return } @@ -55,21 +55,38 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time return case <-lazyTimer.C: + m.logger.Debug("Lazy timer triggered block production") + m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) + case <-blockTimer.C: + if m.txsAvailable { + m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + m.txsAvailable = false + } else { + // Ensure we keep ticking even when there are no txs + blockTimer.Reset(m.config.Node.BlockTime.Duration) + } + case <-m.txNotifyCh: + m.txsAvailable = true } + } +} - // Reset the start time - start := time.Now() +// produceBlock handles the common logic for producing a block and resetting timers +func (m *Manager) produceBlock(ctx context.Context, mode string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() - // Attempt to publish the block regardless of activity - if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { - m.logger.Error("error while publishing block", "error", err) - } - - // Reset both timers for the next aggregation window - lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockTime.Duration)) - blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "mode", mode, "error", err) + } else { + m.logger.Debug("Successfully published block", "mode", mode) } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) } func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Timer) { @@ -87,6 +104,12 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti // Reset the blockTimer to signal the next block production // period based on the block time. blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + + case <-m.txNotifyCh: + // Transaction notifications are intentionally ignored in normal mode + // to avoid triggering block production outside the scheduled intervals. + // We just update the txsAvailable flag for tracking purposes + m.txsAvailable = true } } } diff --git a/block/aggregation_test.go b/block/aggregation_test.go index f16872cf29..fe512a4da8 100644 --- a/block/aggregation_test.go +++ b/block/aggregation_test.go @@ -45,8 +45,8 @@ func TestAggregationLoop_Normal_BasicInterval(t *testing.T) { logger: logger, config: config.Config{ Node: config.NodeConfig{ - BlockTime: config.DurationWrapper{Duration: blockTime}, - LazyAggregator: false, + BlockTime: config.DurationWrapper{Duration: blockTime}, + LazyMode: false, }, DA: config.DAConfig{ BlockTime: config.DurationWrapper{Duration: 1 * time.Second}, @@ -142,8 +142,8 @@ func TestAggregationLoop_Normal_PublishBlockError(t *testing.T) { logger: mockLogger, config: config.Config{ Node: config.NodeConfig{ - BlockTime: config.DurationWrapper{Duration: blockTime}, - LazyAggregator: false, + BlockTime: config.DurationWrapper{Duration: blockTime}, + LazyMode: false, }, DA: config.DAConfig{ BlockTime: config.DurationWrapper{Duration: 1 * time.Second}, diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go new file mode 100644 index 0000000000..3c2ff77985 --- /dev/null +++ b/block/lazy_aggregation_test.go @@ -0,0 +1,403 @@ +package block + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "cosmossdk.io/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/rollkit/rollkit/pkg/config" +) + +// mockPublishBlock is used to control the behavior of publishBlock during tests +type mockPublishBlock struct { + mu sync.Mutex + calls chan struct{} + err error + delay time.Duration // Optional delay to simulate processing time +} + +// reset clears the calls channel in mockPublishBlock. +func (m *mockPublishBlock) reset() { + m.mu.Lock() + defer m.mu.Unlock() + // Clear the channel + for len(m.calls) > 0 { + <-m.calls + } +} + +func (m *mockPublishBlock) publish(ctx context.Context) error { + m.mu.Lock() + err := m.err + delay := m.delay + m.mu.Unlock() + + if delay > 0 { + time.Sleep(delay) + } + // Non-blocking send in case the channel buffer is full or receiver is not ready + select { + case m.calls <- struct{}{}: + default: + } + return err +} + +func setupTestManager(t *testing.T, blockTime, lazyTime time.Duration) (*Manager, *mockPublishBlock) { + t.Helper() + pubMock := &mockPublishBlock{ + calls: make(chan struct{}, 10), // Buffer to avoid blocking in tests + } + logger := log.NewTestLogger(t) + m := &Manager{ + logger: logger, + config: config.Config{ + Node: config.NodeConfig{ + BlockTime: config.DurationWrapper{Duration: blockTime}, + LazyBlockInterval: config.DurationWrapper{Duration: lazyTime}, + LazyMode: true, // Ensure lazy mode is active + }, + }, + publishBlock: pubMock.publish, + } + return m, pubMock +} + +// TestLazyAggregationLoop_BlockTimerTrigger tests that a block is published when the blockTimer fires first. +func TestLazyAggregationLoop_BlockTimerTrigger(t *testing.T) { + require := require.New(t) + + // Create a mock for the publishBlock function that counts calls + callCount := 0 + mockPublishFn := func(ctx context.Context) error { + callCount++ + return nil + } + + // Setup a manager with our mock publish function + blockTime := 50 * time.Millisecond + lazyTime := 200 * time.Millisecond // Lazy timer fires later + m, _ := setupTestManager(t, blockTime, lazyTime) + m.publishBlock = mockPublishFn + + // Set txsAvailable to true to ensure block timer triggers block production + m.txsAvailable = true + + ctx, cancel := context.WithCancel(context.Background()) + + // Start the lazy aggregation loop + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(0) // Fire immediately first time + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for at least one block to be published + time.Sleep(blockTime * 2) + + // Cancel the context to stop the loop + cancel() + wg.Wait() + + // Verify that at least one block was published + require.GreaterOrEqual(callCount, 1, "Expected at least one block to be published") +} + +// TestLazyAggregationLoop_LazyTimerTrigger tests that a block is published when the lazyTimer fires first. +func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + blockTime := 200 * time.Millisecond // Block timer fires later + lazyTime := 50 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + + // Set txsAvailable to false to ensure lazy timer triggers block production + // and block timer doesn't + m.txsAvailable = false + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Use real timers for this test + blockTimer := time.NewTimer(0) // Fire immediately first time + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first publish call triggered by the initial immediate lazyTimer fire + select { + case <-pubMock.calls: + // Good, first block published by lazy timer + case <-time.After(2 * lazyTime): // Give some buffer + require.Fail("timed out waiting for first block publication") + } + + // Wait for the second publish call, triggered by lazyTimer reset + select { + case <-pubMock.calls: + // Good, second block published by lazyTimer + case <-time.After(2 * lazyTime): // Give some buffer + require.Fail("timed out waiting for second block publication (lazyTimer)") + } + + // Ensure blockTimer didn't trigger a publish yet (since txsAvailable is false) + assert.Len(pubMock.calls, 0, "Expected no more publish calls yet") + + cancel() + wg.Wait() +} + +// TestLazyAggregationLoop_PublishError tests that the loop continues after a publish error. +func TestLazyAggregationLoop_PublishError(t *testing.T) { + require := require.New(t) + + blockTime := 50 * time.Millisecond + lazyTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + + pubMock.mu.Lock() + pubMock.err = errors.New("publish failed") + pubMock.mu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Use real timers + blockTimer := time.NewTimer(0) + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first publish attempt (which will fail) + select { + case <-pubMock.calls: + case <-time.After(2 * blockTime): + require.Fail("timed out waiting for first block publication attempt") + } + + // Remove the error for subsequent calls + pubMock.mu.Lock() + pubMock.err = nil + pubMock.mu.Unlock() + + // Wait for the second publish attempt (should succeed) + // Use a longer timeout since we need to wait for either the lazy timer or block timer to fire + select { + case <-pubMock.calls: + case <-time.After(2 * lazyTime): // Use the longer of the two timers with some buffer + require.Fail("timed out waiting for second block publication attempt after error") + } + + cancel() + wg.Wait() +} + +// TestGetRemainingSleep tests the calculation of sleep duration. +func TestGetRemainingSleep(t *testing.T) { + assert := assert.New(t) + + interval := 100 * time.Millisecond + + // Case 1: Elapsed time is less than interval + start1 := time.Now().Add(-30 * time.Millisecond) // Started 30ms ago + sleep1 := getRemainingSleep(start1, interval) + // Expecting interval - elapsed = 100ms - 30ms = 70ms (allow for slight variations) + assert.InDelta(interval-30*time.Millisecond, sleep1, float64(5*time.Millisecond), "Case 1 failed") + + // Case 2: Elapsed time is greater than or equal to interval + start2 := time.Now().Add(-120 * time.Millisecond) // Started 120ms ago + sleep2 := getRemainingSleep(start2, interval) + // Expecting minimum sleep time + assert.Equal(time.Millisecond, sleep2, "Case 2 failed") + + // Case 3: Elapsed time is exactly the interval + start3 := time.Now().Add(-100 * time.Millisecond) // Started 100ms ago + sleep3 := getRemainingSleep(start3, interval) + // Expecting minimum sleep time + assert.Equal(time.Millisecond, sleep3, "Case 3 failed") + + // Case 4: Zero elapsed time + start4 := time.Now() + sleep4 := getRemainingSleep(start4, interval) + assert.InDelta(interval, sleep4, float64(5*time.Millisecond), "Case 4 failed") +} + +// TestLazyAggregationLoop_TxNotification tests that transaction notifications trigger block production in lazy mode +func TestLazyAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 200 * time.Millisecond + lazyTime := 500 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + m.config.Node.LazyMode = true + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Start with a timer that won't fire immediately + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for the initial lazy timer to fire and publish a block + select { + case <-pubMock.calls: + // Initial block was published by lazy timer + case <-time.After(100 * time.Millisecond): + require.Fail("Initial block was not published") + } + + // Reset the mock to track new calls + pubMock.reset() + + // Wait a bit to ensure the loop is running with reset timers + time.Sleep(20 * time.Millisecond) + + // Send a transaction notification + m.NotifyNewTransactions() + + // Wait for the block timer to fire and check txsAvailable + select { + case <-pubMock.calls: + // Block was published, which is what we expect + case <-time.After(blockTime + 50*time.Millisecond): + require.Fail("Block was not published after transaction notification") + } + + // Reset the mock again + pubMock.reset() + + // Send another notification immediately + m.NotifyNewTransactions() + + // Wait for the next block timer to fire + select { + case <-pubMock.calls: + // Block was published after notification + case <-time.After(blockTime + 50*time.Millisecond): + require.Fail("Block was not published after second notification") + } + + cancel() + wg.Wait() +} + +// TestEmptyBlockCreation tests that empty blocks are created with the correct dataHash +func TestEmptyBlockCreation(t *testing.T) { + require := require.New(t) + + // Create a mock for the publishBlock function that captures the context + var capturedCtx context.Context + mockPublishFn := func(ctx context.Context) error { + capturedCtx = ctx + return nil + } + + // Setup a manager with our mock publish function + blockTime := 50 * time.Millisecond + lazyTime := 100 * time.Millisecond + m, _ := setupTestManager(t, blockTime, lazyTime) + m.publishBlock = mockPublishFn + + // Create a context we can cancel + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create timers for the test + lazyTimer := time.NewTimer(lazyTime) + blockTimer := time.NewTimer(blockTime) + defer lazyTimer.Stop() + defer blockTimer.Stop() + + // Call produceBlock directly to test empty block creation + m.produceBlock(ctx, "test_trigger", lazyTimer, blockTimer) + + // Verify that the context was passed correctly + require.NotNil(capturedCtx, "Context should have been captured by mock publish function") + require.Equal(ctx, capturedCtx, "Context should match the one passed to produceBlock") +} + +// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode +func TestNormalAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, 0) + m.config.Node.LazyMode = false + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.normalAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first block to be published by the timer + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer") + } + + // Reset the publish mock to track new calls + pubMock.reset() + + // Send a transaction notification + m.NotifyNewTransactions() + + // In normal mode, the notification should not trigger an immediate block + select { + case <-pubMock.calls: + // If we enable the optional enhancement to reset the timer, this might happen + // But with the current implementation, this should not happen + require.Fail("Block was published immediately after notification in normal mode") + case <-time.After(blockTime / 2): + // This is expected - no immediate block + } + + // Wait for the next regular block + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer after notification") + } + + cancel() + wg.Wait() +} diff --git a/block/manager.go b/block/manager.go index 52422a39ef..5fcc6ee0bc 100644 --- a/block/manager.go +++ b/block/manager.go @@ -126,7 +126,7 @@ type Manager struct { logger log.Logger // For usage by Lazy Aggregator mode - buildingBlock bool + txsAvailable bool pendingHeaders *PendingHeaders @@ -148,6 +148,9 @@ type Manager struct { // publishBlock is the function used to publish blocks. It defaults to // the manager's publishBlock method but can be overridden for testing. publishBlock publishBlockFunc + + // txNotifyCh is used to signal when new transactions are available + txNotifyCh chan struct{} } // getInitialState tries to load lastState from Store, and if it's not available it reads genesis. @@ -281,9 +284,9 @@ func NewManager( config.Node.BlockTime.Duration = defaultBlockTime } - if config.Node.LazyBlockTime.Duration == 0 { + if config.Node.LazyBlockInterval.Duration == 0 { logger.Info("Using default lazy block time", "LazyBlockTime", defaultLazyBlockTime) - config.Node.LazyBlockTime.Duration = defaultLazyBlockTime + config.Node.LazyBlockInterval.Duration = defaultLazyBlockTime } if config.DA.MempoolTTL == 0 { @@ -332,7 +335,7 @@ func NewManager( dataCache: cache.NewCache[types.Data](), retrieveCh: make(chan struct{}, 1), logger: logger, - buildingBlock: false, + txsAvailable: false, pendingHeaders: pendingHeaders, metrics: seqMetrics, sequencer: sequencer, @@ -340,6 +343,7 @@ func NewManager( da: da, gasPrice: gasPrice, gasMultiplier: gasMultiplier, + txNotifyCh: make(chan struct{}, 1), // Non-blocking channel } agg.init(ctx) // Set the default publishBlock implementation @@ -445,15 +449,19 @@ func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { "txCount", len(res.Batch.Transactions), "timestamp", res.Timestamp) + var errRetrieveBatch error + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing if len(res.Batch.Transactions) == 0 { - return nil, ErrNoBatch + errRetrieveBatch = ErrNoBatch } - h := convertBatchDataToBytes(res.BatchData) - if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { + // Even if there are no transactions, update lastBatchData so we don’t + // repeatedly emit the same empty batch, and persist it to metadata. + if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { m.logger.Error("error while setting last batch hash", "error", err) } m.lastBatchData = res.BatchData - return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, nil + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch } return nil, ErrNoBatch } @@ -483,10 +491,12 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { lastHeaderTime time.Time err error ) + height, err := m.store.Height(ctx) if err != nil { return fmt.Errorf("error while getting store height: %w", err) } + newHeight := height + 1 // this is a special case, when first block is produced - there is no previous commit if newHeight <= m.genesis.InitialHeight { @@ -521,26 +531,30 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { data = pendingData } else { batchData, err := m.retrieveBatch(ctx) - if errors.Is(err, ErrNoBatch) { - m.logger.Info("No batch retrieved from sequencer, skipping block production") - return nil // Indicate no block was produced - } else if err != nil { - return fmt.Errorf("failed to get transactions from batch: %w", err) + if err != nil { + if errors.Is(err, ErrNoBatch) { + if batchData == nil { + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } + m.logger.Info("Creating empty block", "height", newHeight) + } else { + return fmt.Errorf("failed to get transactions from batch: %w", err) + } + } else { + if batchData.Before(lastHeaderTime) { + return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) + } + m.logger.Info("Creating and publishing block", "height", newHeight) + m.logger.Debug("block info", "num_tx", len(batchData.Transactions)) } - // sanity check timestamp for monotonically increasing - if batchData.Before(lastHeaderTime) { - return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) - } - m.logger.Info("Creating and publishing block", "height", newHeight) header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) if err != nil { return err } - m.logger.Debug("block info", "num_tx", len(data.Txs)) - err = m.store.SaveBlockData(ctx, header, data, &signature) - if err != nil { + if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { return SaveBlockError{err} } } @@ -664,25 +678,7 @@ func (m *Manager) createBlock(ctx context.Context, height uint64, lastSignature return m.execCreateBlock(ctx, height, lastSignature, lastHeaderHash, m.lastState, batchData) } -func (m *Manager) applyBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) (types.State, error) { - m.lastStateMtx.RLock() - defer m.lastStateMtx.RUnlock() - return m.execApplyBlock(ctx, m.lastState, header, data) -} - -func (m *Manager) execValidate(_ types.State, _ *types.SignedHeader, _ *types.Data) error { - // TODO(tzdybal): implement - return nil -} - -func (m *Manager) execCommit(ctx context.Context, newState types.State, h *types.SignedHeader, _ *types.Data) ([]byte, error) { - err := m.exec.SetFinal(ctx, h.Height()) - return newState.AppHash, err -} - -func (m *Manager) execCreateBlock(_ context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, lastState types.State, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { - batchDataIDs := convertBatchDataToBytes(batchData.Data) - +func (m *Manager) execCreateBlock(ctx context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, lastState types.State, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { if m.signer == nil { return nil, nil, fmt.Errorf("signer is nil; cannot create block") } @@ -701,21 +697,32 @@ func (m *Manager) execCreateBlock(_ context.Context, height uint64, lastSignatur return nil, nil, fmt.Errorf("proposer address is not the same as the genesis proposer address %x != %x", address, m.genesis.ProposerAddress) } + // Determine if this is an empty block + isEmpty := batchData.Batch == nil || len(batchData.Transactions) == 0 + + // Set the appropriate data hash based on whether this is an empty block + var dataHash types.Hash + if isEmpty { + dataHash = dataHashForEmptyTxs // Use dataHashForEmptyTxs for empty blocks + } else { + dataHash = convertBatchDataToBytes(batchData.Data) + } + header := &types.SignedHeader{ Header: types.Header{ Version: types.Version{ - Block: lastState.Version.Block, - App: lastState.Version.App, + Block: m.lastState.Version.Block, + App: m.lastState.Version.App, }, BaseHeader: types.BaseHeader{ - ChainID: lastState.ChainID, + ChainID: m.lastState.ChainID, Height: height, Time: uint64(batchData.UnixNano()), //nolint:gosec // why is time unix? (tac0turtle) }, LastHeaderHash: lastHeaderHash, - DataHash: batchDataIDs, + DataHash: dataHash, ConsensusHash: make(types.Hash, 32), - AppHash: lastState.AppHash, + AppHash: m.lastState.AppHash, ProposerAddress: m.genesis.ProposerAddress, }, Signature: *lastSignature, @@ -725,16 +732,38 @@ func (m *Manager) execCreateBlock(_ context.Context, height uint64, lastSignatur }, } + // Create block data with appropriate transactions blockData := &types.Data{ - Txs: make(types.Txs, len(batchData.Transactions)), + Txs: make(types.Txs, 0), // Start with empty transaction list } - for i := range batchData.Transactions { - blockData.Txs[i] = types.Tx(batchData.Transactions[i]) + + // Only add transactions if this is not an empty block + if !isEmpty { + blockData.Txs = make(types.Txs, len(batchData.Transactions)) + for i := range batchData.Transactions { + blockData.Txs[i] = types.Tx(batchData.Transactions[i]) + } } return header, blockData, nil } +func (m *Manager) applyBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) (types.State, error) { + m.lastStateMtx.RLock() + defer m.lastStateMtx.RUnlock() + return m.execApplyBlock(ctx, m.lastState, header, data) +} + +func (m *Manager) execValidate(_ types.State, _ *types.SignedHeader, _ *types.Data) error { + // TODO(tzdybal): implement + return nil +} + +func (m *Manager) execCommit(ctx context.Context, newState types.State, h *types.SignedHeader, _ *types.Data) ([]byte, error) { + err := m.exec.SetFinal(ctx, h.Height()) + return newState.AppHash, err +} + func (m *Manager) execApplyBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) (types.State, error) { rawTxs := make([][]byte, len(data.Txs)) for i := range data.Txs { @@ -846,3 +875,16 @@ func (m *Manager) getSignature(header types.Header) (types.Signature, error) { } return m.signer.Sign(b) } + +// NotifyNewTransactions signals that new transactions are available for processing +// This method will be called by the Reaper when it receives new transactions +func (m *Manager) NotifyNewTransactions() { + // Non-blocking send to avoid slowing down the transaction submission path + select { + case m.txNotifyCh <- struct{}{}: + // Successfully sent notification + default: + // Channel buffer is full, which means a notification is already pending + // This is fine, as we just need to trigger one block production + } +} diff --git a/block/publish_block_test.go b/block/publish_block_test.go index f3440250c5..a52f50b89b 100644 --- a/block/publish_block_test.go +++ b/block/publish_block_test.go @@ -227,6 +227,9 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { noopSigner, err := noopsigner.NewNoopSigner(privKey) require.NoError(err) + daH := atomic.Uint64{} + daH.Store(0) + m := &Manager{ store: mockStore, sequencer: mockSeq, @@ -245,6 +248,18 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { }, lastStateMtx: &sync.RWMutex{}, metrics: NopMetrics(), + lastState: types.State{ + ChainID: chainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastBlockTime: time.Now(), + AppHash: []byte("initialAppHash"), + }, + headerCache: cache.NewCache[types.SignedHeader](), + dataCache: cache.NewCache[types.Data](), + HeaderCh: make(chan *types.SignedHeader, 1), + DataCh: make(chan *types.Data, 1), + daHeight: &daH, } m.publishBlock = m.publishBlockInternal @@ -279,19 +294,34 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { }) mockSeq.On("GetNextBatch", ctx, batchReqMatcher).Return(emptyBatchResponse, nil).Once() + // Mock SetMetadata for LastBatchDataKey (required for empty batch handling) + mockStore.On("SetMetadata", ctx, "l", mock.AnythingOfType("[]uint8")).Return(nil).Once() + + // With our new implementation, we should expect SaveBlockData to be called for empty blocks + mockStore.On("SaveBlockData", ctx, mock.AnythingOfType("*types.SignedHeader"), mock.AnythingOfType("*types.Data"), mock.AnythingOfType("*types.Signature")).Return(nil).Once() + + // We should also expect ExecuteTxs to be called with an empty transaction list + newAppHash := []byte("newAppHash") + mockExec.On("ExecuteTxs", ctx, mock.Anything, currentHeight+1, mock.AnythingOfType("time.Time"), m.lastState.AppHash).Return(newAppHash, uint64(100), nil).Once() + + // SetFinal should be called + mockExec.On("SetFinal", ctx, currentHeight+1).Return(nil).Once() + + // SetHeight should be called + mockStore.On("SetHeight", ctx, currentHeight+1).Return(nil).Once() + + // UpdateState should be called + mockStore.On("UpdateState", ctx, mock.AnythingOfType("types.State")).Return(nil).Once() + + // SaveBlockData should be called again after validation + mockStore.On("SaveBlockData", ctx, mock.AnythingOfType("*types.SignedHeader"), mock.AnythingOfType("*types.Data"), mock.AnythingOfType("*types.Signature")).Return(nil).Once() + // Call publishBlock err = m.publishBlock(ctx) // Assertions require.NoError(err, "publishBlock should return nil error when the batch is empty") - // Verify mocks: Ensure methods after the check were NOT called - mockStore.AssertNotCalled(t, "SaveBlockData", mock.Anything, mock.Anything, mock.Anything, mock.Anything) - mockExec.AssertNotCalled(t, "ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - mockExec.AssertNotCalled(t, "SetFinal", mock.Anything, mock.Anything) - mockStore.AssertNotCalled(t, "SetHeight", mock.Anything, mock.Anything) - mockStore.AssertNotCalled(t, "UpdateState", mock.Anything, mock.Anything) - mockSeq.AssertExpectations(t) mockStore.AssertExpectations(t) mockExec.AssertExpectations(t) diff --git a/block/reaper.go b/block/reaper.go index a517747bf0..5c18c1cd29 100644 --- a/block/reaper.go +++ b/block/reaper.go @@ -27,6 +27,7 @@ type Reaper struct { logger log.Logger ctx context.Context seenStore ds.Batching + manager *Manager } // NewReaper creates a new Reaper instance with persistent seenTx storage. @@ -45,6 +46,11 @@ func NewReaper(ctx context.Context, exec coreexecutor.Executor, sequencer corese } } +// SetManager sets the Manager reference for transaction notifications +func (r *Reaper) SetManager(manager *Manager) { + r.manager = manager +} + // Start begins the reaping process at the specified interval. func (r *Reaper) Start(ctx context.Context) { r.ctx = ctx @@ -110,6 +116,12 @@ func (r *Reaper) SubmitTxs() { } } + // Notify the manager that new transactions are available + if r.manager != nil && len(newTxs) > 0 { + r.logger.Debug("Notifying manager of new transactions") + r.manager.NotifyNewTransactions() + } + r.logger.Debug("Reaper successfully submitted txs") } diff --git a/node/full.go b/node/full.go index 21413a8359..c5e95c1f85 100644 --- a/node/full.go +++ b/node/full.go @@ -97,16 +97,6 @@ func newFullNode( store := store.New(mainKV) - reaper := block.NewReaper( - ctx, - exec, - sequencer, - genesis.ChainID, - nodeConfig.Node.BlockTime.Duration, - logger.With("module", "Reaper"), - mainKV, - ) - blockManager, err := initBlockManager( ctx, signer, @@ -127,6 +117,19 @@ func newFullNode( return nil, err } + reaper := block.NewReaper( + ctx, + exec, + sequencer, + genesis.ChainID, + nodeConfig.Node.BlockTime.Duration, + logger.With("module", "Reaper"), + mainKV, + ) + + // Connect the reaper to the manager for transaction notifications + reaper.SetManager(blockManager) + node := &FullNode{ genesis: genesis, nodeConfig: nodeConfig, diff --git a/node/helpers_test.go b/node/helpers_test.go index f334db00dc..226fb8e229 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -22,9 +22,9 @@ func getTestConfig(t *testing.T, n int) rollkitconfig.Config { return rollkitconfig.Config{ RootDir: t.TempDir(), Node: rollkitconfig.NodeConfig{ - Aggregator: true, - BlockTime: rollkitconfig.DurationWrapper{Duration: 500 * time.Millisecond}, - LazyBlockTime: rollkitconfig.DurationWrapper{Duration: 5 * time.Second}, + Aggregator: true, + BlockTime: rollkitconfig.DurationWrapper{Duration: 500 * time.Millisecond}, + LazyBlockInterval: rollkitconfig.DurationWrapper{Duration: 5 * time.Second}, }, DA: rollkitconfig.DAConfig{ Address: MockDAAddress, diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index 98c387a65e..f94e334c00 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -63,8 +63,8 @@ func TestParseFlags(t *testing.T) { "--rollkit.da.mempool_ttl", "10", "--rollkit.da.namespace", "namespace", "--rollkit.da.start_height", "100", - "--rollkit.node.lazy_aggregator", - "--rollkit.node.lazy_block_time", "2m", + "--rollkit.node.lazy_mode", + "--rollkit.node.lazy_block_interval", "2m", "--rollkit.node.light", "--rollkit.node.max_pending_blocks", "100", "--rollkit.node.trusted_hash", "abcdef1234567890", @@ -123,8 +123,8 @@ func TestParseFlags(t *testing.T) { {"DAMempoolTTL", nodeConfig.DA.MempoolTTL, uint64(10)}, {"DANamespace", nodeConfig.DA.Namespace, "namespace"}, {"DAStartHeight", nodeConfig.DA.StartHeight, uint64(100)}, - {"LazyAggregator", nodeConfig.Node.LazyAggregator, true}, - {"LazyBlockTime", nodeConfig.Node.LazyBlockTime.Duration, 2 * time.Minute}, + {"LazyAggregator", nodeConfig.Node.LazyMode, true}, + {"LazyBlockTime", nodeConfig.Node.LazyBlockInterval.Duration, 2 * time.Minute}, {"Light", nodeConfig.Node.Light, true}, {"MaxPendingBlocks", nodeConfig.Node.MaxPendingBlocks, uint64(100)}, {"TrustedHash", nodeConfig.Node.TrustedHash, "abcdef1234567890"}, diff --git a/pkg/config/config.go b/pkg/config/config.go index ee1a34bf08..7394222967 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -37,11 +37,11 @@ const ( // FlagTrustedHash is a flag for specifying the trusted hash FlagTrustedHash = "rollkit.node.trusted_hash" // FlagLazyAggregator is a flag for enabling lazy aggregation mode that only produces blocks when transactions are available - FlagLazyAggregator = "rollkit.node.lazy_aggregator" + FlagLazyAggregator = "rollkit.node.lazy_mode" // FlagMaxPendingBlocks is a flag to limit and pause block production when too many blocks are waiting for DA confirmation FlagMaxPendingBlocks = "rollkit.node.max_pending_blocks" // FlagLazyBlockTime is a flag for specifying the maximum interval between blocks in lazy aggregation mode - FlagLazyBlockTime = "rollkit.node.lazy_block_time" + FlagLazyBlockTime = "rollkit.node.lazy_block_interval" // Data Availability configuration flags @@ -162,10 +162,10 @@ type NodeConfig struct { Light bool `yaml:"light" comment:"Run node in light mode"` // Block management configuration - BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` - MaxPendingBlocks uint64 `mapstructure:"max_pending_blocks" yaml:"max_pending_blocks" comment:"Maximum number of blocks pending DA submission. When this limit is reached, the aggregator pauses block production until some blocks are confirmed. Use 0 for no limit."` - LazyAggregator bool `mapstructure:"lazy_aggregator" yaml:"lazy_aggregator" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."` - LazyBlockTime DurationWrapper `mapstructure:"lazy_block_time" yaml:"lazy_block_time" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."` + BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` + MaxPendingBlocks uint64 `mapstructure:"max_pending_blocks" yaml:"max_pending_blocks" comment:"Maximum number of blocks pending DA submission. When this limit is reached, the aggregator pauses block production until some blocks are confirmed. Use 0 for no limit."` + LazyMode bool `mapstructure:"lazy_mode" yaml:"lazy_mode" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."` + LazyBlockInterval DurationWrapper `mapstructure:"lazy_block_interval" yaml:"lazy_block_interval" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."` // Header configuration TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."` @@ -240,9 +240,9 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Bool(FlagLight, def.Node.Light, "run light client") cmd.Flags().Duration(FlagBlockTime, def.Node.BlockTime.Duration, "block time (for aggregator mode)") cmd.Flags().String(FlagTrustedHash, def.Node.TrustedHash, "initial trusted hash to start the header exchange service") - cmd.Flags().Bool(FlagLazyAggregator, def.Node.LazyAggregator, "produce blocks only when transactions are available or after lazy block time") + cmd.Flags().Bool(FlagLazyAggregator, def.Node.LazyMode, "produce blocks only when transactions are available or after lazy block time") cmd.Flags().Uint64(FlagMaxPendingBlocks, def.Node.MaxPendingBlocks, "maximum blocks pending DA confirmation before pausing block production (0 for no limit)") - cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockTime.Duration, "maximum interval between blocks in lazy aggregation mode") + cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockInterval.Duration, "maximum interval between blocks in lazy aggregation mode") // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7974a1f0a6..da44d7fd7a 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -27,12 +27,12 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, "", def.DA.SubmitOptions) assert.Equal(t, "", def.DA.Namespace) assert.Equal(t, 1*time.Second, def.Node.BlockTime.Duration) - assert.Equal(t, 15*time.Second, def.DA.BlockTime.Duration) + assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration) assert.Equal(t, uint64(0), def.DA.StartHeight) assert.Equal(t, uint64(0), def.DA.MempoolTTL) assert.Equal(t, uint64(0), def.Node.MaxPendingBlocks) - assert.Equal(t, false, def.Node.LazyAggregator) - assert.Equal(t, 60*time.Second, def.Node.LazyBlockTime.Duration) + assert.Equal(t, false, def.Node.LazyMode) + assert.Equal(t, 60*time.Second, def.Node.LazyBlockInterval.Duration) assert.Equal(t, "", def.Node.TrustedHash) assert.Equal(t, "file", def.Signer.SignerType) assert.Equal(t, "config", def.Signer.SignerPath) @@ -57,9 +57,9 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagLight, DefaultConfig.Node.Light) assertFlagValue(t, flags, FlagBlockTime, DefaultConfig.Node.BlockTime.Duration) assertFlagValue(t, flags, FlagTrustedHash, DefaultConfig.Node.TrustedHash) - assertFlagValue(t, flags, FlagLazyAggregator, DefaultConfig.Node.LazyAggregator) + assertFlagValue(t, flags, FlagLazyAggregator, DefaultConfig.Node.LazyMode) assertFlagValue(t, flags, FlagMaxPendingBlocks, DefaultConfig.Node.MaxPendingBlocks) - assertFlagValue(t, flags, FlagLazyBlockTime, DefaultConfig.Node.LazyBlockTime.Duration) + assertFlagValue(t, flags, FlagLazyBlockTime, DefaultConfig.Node.LazyBlockInterval.Duration) // DA flags assertFlagValue(t, flags, FlagDAAddress, DefaultConfig.DA.Address) @@ -232,7 +232,7 @@ signer: cmd.SetArgs([]string{ "--home=" + tempDir, "--rollkit.da.gas_price=0.5", - "--rollkit.node.lazy_aggregator=true", + "--rollkit.node.lazy_mode=true", }) err = cmd.Execute() require.NoError(t, err) @@ -245,7 +245,7 @@ signer: v := viper.New() v.Set(FlagRootDir, tempDir) v.Set("rollkit.da.gas_price", "0.5") - v.Set("rollkit.node.lazy_aggregator", true) + v.Set("rollkit.node.lazy_mode", true) // Load configuration using the new LoadFromViper method cfgFromViper, err := LoadFromViper(v) @@ -254,7 +254,7 @@ signer: // Compare the results - they should be identical require.Equal(t, cfgFromLoad.RootDir, cfgFromViper.RootDir, "RootDir should match") require.Equal(t, cfgFromLoad.DA.GasPrice, cfgFromViper.DA.GasPrice, "DA.GasPrice should match") - require.Equal(t, cfgFromLoad.Node.LazyAggregator, cfgFromViper.Node.LazyAggregator, "Node.LazyAggregator should match") + require.Equal(t, cfgFromLoad.Node.LazyMode, cfgFromViper.Node.LazyMode, "Node.LazyMode should match") require.Equal(t, cfgFromLoad.Node.Aggregator, cfgFromViper.Node.Aggregator, "Node.Aggregator should match") require.Equal(t, cfgFromLoad.Node.BlockTime, cfgFromViper.Node.BlockTime, "Node.BlockTime should match") require.Equal(t, cfgFromLoad.DA.Address, cfgFromViper.DA.Address, "DA.Address should match") diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index b3d52cd521..6b5ce0474b 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -45,16 +45,16 @@ var DefaultConfig = Config{ Peers: "", }, Node: NodeConfig{ - Aggregator: false, - BlockTime: DurationWrapper{1 * time.Second}, - LazyAggregator: false, - LazyBlockTime: DurationWrapper{60 * time.Second}, - Light: false, - TrustedHash: "", + Aggregator: false, + BlockTime: DurationWrapper{1 * time.Second}, + LazyMode: false, + LazyBlockInterval: DurationWrapper{60 * time.Second}, + Light: false, + TrustedHash: "", }, DA: DAConfig{ Address: "http://localhost:7980", - BlockTime: DurationWrapper{15 * time.Second}, + BlockTime: DurationWrapper{6 * time.Second}, GasPrice: -1, GasMultiplier: 0, }, diff --git a/rollups/testapp/cmd/init_test.go b/rollups/testapp/cmd/init_test.go index eb9aae6f75..a002e6f574 100644 --- a/rollups/testapp/cmd/init_test.go +++ b/rollups/testapp/cmd/init_test.go @@ -71,7 +71,7 @@ func TestInitCommand(t *testing.T) { require.Contains(t, yamlContent, "da:") require.Contains(t, yamlContent, "block_time: ") require.Contains(t, yamlContent, "15s") - require.Contains(t, yamlContent, "lazy_block_time: ") + require.Contains(t, yamlContent, "lazy_block_interval: ") require.Contains(t, yamlContent, "1m0s") // Verify addresses diff --git a/rollups/testapp/kv/kvexecutor_test.go b/rollups/testapp/kv/kvexecutor_test.go index a6fd24d1d6..774c8b7735 100644 --- a/rollups/testapp/kv/kvexecutor_test.go +++ b/rollups/testapp/kv/kvexecutor_test.go @@ -1,6 +1,7 @@ package executor import ( + "bytes" "context" "reflect" "strings" @@ -32,7 +33,7 @@ func TestInitChain_Idempotency(t *testing.T) { if err != nil { t.Fatalf("InitChain failed on second call: %v", err) } - if !reflect.DeepEqual(stateRoot1, stateRoot2) { + if !bytes.Equal(stateRoot1, stateRoot2) { t.Errorf("Genesis state roots do not match: %s vs %s", stateRoot1, stateRoot2) } if maxBytes2 != 1024 { diff --git a/specs/lazy-adr/adr-007-header-commit-to-shares.md b/specs/lazy-adr/adr-007-header-commit-to-shares.md index 283fece0ae..acc5d08cde 100644 --- a/specs/lazy-adr/adr-007-header-commit-to-shares.md +++ b/specs/lazy-adr/adr-007-header-commit-to-shares.md @@ -56,13 +56,10 @@ Discarded ### Positive - ### Negative - ### Neutral - ## References 1. diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md new file mode 100644 index 0000000000..40543440cb --- /dev/null +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -0,0 +1,209 @@ +# ADR 021: Lazy Aggregation with DA Layer Consistency + +## Changelog + +- 2024-01-24: Initial draft +- 2024-01-24: Revised to use existing empty batch mechanism +- 2024-01-25: Updated with implementation details from aggregation.go + +## Context + +Rollkit's lazy aggregation mechanism currently produces blocks at set intervals when no transactions are present, and immediately when transactions are available. However, this approach creates inconsistency with the DA layer (Celestia) as empty blocks are not posted to the DA layer. This breaks the expected 1:1 mapping between DA layer blocks and execution layer blocks in EVM environments. + +## Decision + +Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintain block height consistency. + +## Detailed Design + +### Implementation Details + +1. **Modified Batch Retrieval**: + + The batch retrieval mechanism has been modified to handle empty batches differently. Instead of discarding empty batches, we now return them with the ErrNoBatch error, allowing the caller to create empty blocks with proper timestamps. This ensures that block timing remains consistent even during periods of inactivity. + + ```go + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { + res, err := m.sequencer.GetNextBatch(ctx, req) + if err != nil { + return nil, err + } + + if res != nil && res.Batch != nil { + m.logger.Debug("Retrieved batch", + "txCount", len(res.Batch.Transactions), + "timestamp", res.Timestamp) + + var errRetrieveBatch error + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing + if len(res.Batch.Transactions) == 0 { + errRetrieveBatch = ErrNoBatch + } + // Even if there are no transactions, update lastBatchData so we don't + // repeatedly emit the same empty batch, and persist it to metadata. + if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { + m.logger.Error("error while setting last batch hash", "error", err) + } + m.lastBatchData = res.BatchData + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch + } + return nil, ErrNoBatch + } + ``` + +2. **Empty Block Creation**: + + The block publishing logic has been enhanced to create empty blocks when a batch with no transactions is received. This uses the special `dataHashForEmptyTxs` value to indicate an empty batch, maintaining the block height consistency with the DA layer while minimizing overhead. + + ```go + // In publishBlock method + batchData, err := m.retrieveBatch(ctx) + if err != nil { + if errors.Is(err, ErrNoBatch) { + if batchData == nil { + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } + m.logger.Info("Creating empty block", "height", newHeight) + } else { + return fmt.Errorf("failed to get transactions from batch: %w", err) + } + } else { + if batchData.Before(lastHeaderTime) { + return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) + } + m.logger.Info("Creating and publishing block", "height", newHeight) + m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) + } + + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { + return SaveBlockError{err} + } + ``` + +3. **Lazy Aggregation Loop**: + + A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. + + ```go + // In Reaper.SubmitTxs + if r.manager != nil && len(newTxs) > 0 { + r.logger.Debug("Notifying manager of new transactions") + r.manager.NotifyNewTransactions() // Signals txNotifyCh + } + + // In Manager.NotifyNewTransactions + func (m *Manager) NotifyNewTransactions() { + select { + case m.txNotifyCh <- struct{}{}: + // Successfully sent notification + default: + // Channel buffer is full, notification already pending + } + } + // Modified lazyAggregationLoop + func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { + // lazyTimer triggers block publication even during inactivity + lazyTimer := time.NewTimer(0) + defer lazyTimer.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-lazyTimer.C: + m.logger.Debug("Lazy timer triggered block production") + m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) + + case <-blockTimer.C: + if m.txsAvailable { + m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + m.txsAvailable = false + } else { + // Ensure we keep ticking even when there are no txs + blockTimer.Reset(m.config.Node.BlockTime.Duration) + } + case <-m.txNotifyCh: + m.txsAvailable = true + } + } + } + ``` + +4. **Block Production**: + + The block production function centralizes the logic for publishing blocks and resetting timers. It records the start time, attempts to publish a block, and then intelligently resets both timers based on the elapsed time. This ensures that block production remains on schedule even if the block creation process takes significant time. + + ```go + func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() + + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + } else { + m.logger.Debug("Successfully published block", "trigger", trigger) + } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + } + ``` + +### Key Changes + +1. Return batch with timestamp even when empty, allowing proper block timing +2. Use existing `dataHashForEmptyTxs` for empty block indication +3. Leverage current sync mechanisms that already handle empty blocks +4. Implement a dedicated lazy aggregation loop with two timers: + - `blockTimer`: Triggers block production at regular intervals when transactions are available + - `lazyTimer`: Ensures blocks are produced even during periods of inactivity +5. Maintain transaction availability tracking via the `txsAvailable` flag and notification channel + +### Efficiency Considerations + +- Minimal DA layer overhead for empty blocks +- Reuses existing empty block detection mechanism +- Maintains proper block timing using batch timestamps +- Intelligent timer management to account for block production time +- Non-blocking transaction notification channel to prevent backpressure + +## Status + +Implemented + +## Consequences + +### Positive + +- Maintains consistent block heights between DA and execution layers +- Leverages existing empty block mechanisms +- Simpler implementation than sentinel-based approach +- Preserves proper block timing +- Provides deterministic block production even during network inactivity +- Reduces latency for transaction inclusion during active periods + +### Negative + +- Small DA layer overhead for empty blocks +- Additional complexity in timer management + +### Neutral + +- Requires careful handling of batch timestamps +- Maintains backward compatibility with existing Rollkit deployments + +## References + +- [Block Manager Implementation](block/manager.go) +- [Block Aggregation Implementation](block/aggregation.go) +- [Lazy Aggregation Tests](block/lazy_aggregation_test.go) diff --git a/specs/src/specs/block-manager.md b/specs/src/specs/block-manager.md index b60b51e27b..ab498b2653 100644 --- a/specs/src/specs/block-manager.md +++ b/specs/src/specs/block-manager.md @@ -63,7 +63,8 @@ Block manager configuration options: |BlockTime|time.Duration|time interval used for block production and block retrieval from block store ([`defaultBlockTime`][defaultBlockTime])| |DABlockTime|time.Duration|time interval used for both block publication to DA network and block retrieval from DA network ([`defaultDABlockTime`][defaultDABlockTime])| |DAStartHeight|uint64|block retrieval from DA network starts from this height| -|LazyBlockTime|time.Duration|time interval used for block production in lazy aggregator mode even when there are no transactions ([`defaultLazyBlockTime`][defaultLazyBlockTime])| +|LazyBlockInterval|time.Duration|time interval used for block production in lazy aggregator mode even when there are no transactions ([`defaultLazyBlockTime`][defaultLazyBlockTime])| +|LazyMode|bool|when set to true, enables lazy aggregation mode which produces blocks only when transactions are available or at LazyBlockInterval intervals| ### Block Production @@ -71,7 +72,12 @@ When the full node is operating as a sequencer (aka aggregator), the block manag In `normal` mode, the block manager runs a timer, which is set to the `BlockTime` configuration parameter, and continuously produces blocks at `BlockTime` intervals. -In `lazy` mode, the block manager starts building a block when any transaction becomes available in the mempool. After the first notification of the transaction availability, the manager will wait for a 1 second timer to finish, in order to collect as many transactions from the mempool as possible. The 1 second delay is chosen in accordance with the default block time of 1s. The block manager also notifies the full node after every lazy block building. +In `lazy` mode, the block manager implements a dual timer mechanism: + +1. A `blockTimer` that triggers block production at regular intervals when transactions are available +2. A `lazyTimer` that ensures blocks are produced at `LazyBlockInterval` intervals even during periods of inactivity + +The block manager starts building a block when any transaction becomes available in the mempool via a notification channel (`txNotifyCh`). When the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. The block manager also produces empty blocks at regular intervals to maintain consistency with the DA layer, ensuring a 1:1 mapping between DA layer blocks and execution layer blocks. #### Building the Block @@ -154,6 +160,12 @@ The communication between the full node and block manager: * The block manager loads the initial state from the local store and uses genesis if not found in the local store, when the node (re)starts. * The default mode for sequencer nodes is normal (not lazy). * The sequencer can produce empty blocks. +* In lazy aggregation mode, the block manager maintains consistency with the DA layer by producing empty blocks at regular intervals, ensuring a 1:1 mapping between DA layer blocks and execution layer blocks. +* The lazy aggregation mechanism uses a dual timer approach: + * A `blockTimer` that triggers block production when transactions are available + * A `lazyTimer` that ensures blocks are produced even during periods of inactivity +* Empty batches are handled differently in lazy mode - instead of discarding them, they are returned with the `ErrNoBatch` error, allowing the caller to create empty blocks with proper timestamps. +* Transaction notifications from the `Reaper` to the `Manager` are handled via a non-blocking notification channel (`txNotifyCh`) to prevent backpressure. * The block manager uses persistent storage (disk) when the `root_dir` and `db_path` configuration parameters are specified in `config.yaml` file under the app directory. If these configuration parameters are not specified, the in-memory storage is used, which will not be persistent if the node stops. * The block manager does not re-apply the block again (in other words, create a new updated state and persist it) when a block was initially applied using P2P block sync, but later was DA included during DA retrieval. The block is only marked DA included in this case. * The data sync store is created by prefixing `dataSync` on the main data store. @@ -187,6 +199,8 @@ See [tutorial] for running a multi-node network with both sequencer and non-sequ [8] [Data Availability](./da.md) +[9] [Lazy Aggregation with DA Layer Consistency ADR](../../lazy-adr/adr-021-lazy-aggregation.md) + [maxSubmitAttempts]: https://github.com/rollkit/rollkit/blob/main/block/manager.go#L50 [defaultBlockTime]: https://github.com/rollkit/rollkit/blob/main/block/manager.go#L36 [defaultDABlockTime]: https://github.com/rollkit/rollkit/blob/main/block/manager.go#L33