From 86ed975a360fc0af8a2482ab01654b2a26bc7781 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 29 Apr 2025 21:17:32 +0200 Subject: [PATCH 01/21] create lazy aggregation adr proposal --- specs/lazy-adr/adr-021-lazy-aggregation.md | 160 +++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 specs/lazy-adr/adr-021-lazy-aggregation.md diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md new file mode 100644 index 0000000000..f37053907b --- /dev/null +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -0,0 +1,160 @@ +# ADR 021: Lazy Aggregation with DA Layer Consistency + +## Changelog + +- 2024-01-24: Initial draft + +## Context + +Rollkit's lazy aggregation mechanism currently produces blocks at set intervals when no transactions are present, and immediately when transactions are available. However, this approach creates inconsistency with the DA layer (Celestia) as empty blocks are not posted to the DA layer. This breaks the expected 1:1 mapping between DA layer blocks and execution layer blocks in EVM environments. + +The current implementation in `block/aggregation.go` needs to be enhanced to maintain consistency between DA layer blocks and execution layer blocks, even during periods of inactivity. + +## Alternative Approaches + +### 1. Skip Empty Blocks Entirely + +- Continue current behavior of not posting empty blocks +- **Rejected** because it breaks EVM execution layer expectations and creates inconsistent block mapping + +### 2. Post Full Block Structure for Empty Blocks + +- Post regular blocks with empty transaction lists +- **Rejected** because it wastes DA layer space unnecessarily + +### 3. Sentinel Empty Block Marker (Chosen) + +- Post a special single-byte transaction to mark empty blocks +- Optimize DA layer space usage while maintaining block consistency +- Provides clear distinction between empty and failed/missed blocks + +## Decision + +Implement a sentinel-based empty block marking system that: + +1. Posts a special sentinel transaction to the DA layer for empty blocks +2. Maintains block height consistency between DA and execution layers +3. Optimizes DA layer space usage +4. Preserves existing lazy aggregation timing behavior + +## Detailed Design + +### User Requirements + +- Maintain consistent block heights between DA and execution layers +- Minimize DA layer costs for empty blocks +- Preserve existing lazy aggregation performance characteristics +- Support EVM execution layer expectations + +### Systems Affected + +1. Block Manager (`block/aggregation.go`) +2. Transaction Processing Pipeline +3. DA Layer Interface +4. Block Execution Logic + +### Data Structures + +```go +const ( + // EmptyBlockSentinel is a special byte sequence marking empty blocks + EmptyBlockSentinel = []byte{0x00, 0xE0, 0x0E} // "EMPTY" in minimal form +) + +type BlockData struct { + Txs [][]byte + IsEmpty bool +} +``` + +### Implementation Details + +1. **Empty Block Detection**: + + ```go + func (m *Manager) isEmptyBlock(txs [][]byte) bool { + return len(txs) == 1 && bytes.Equal(txs[0], EmptyBlockSentinel) + } + ``` + +2. **Modified Aggregation Loop**: + + ```go + func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *timTimer) { + // ... existing timer setup ... + + for { + select { + case <-ctx.Done(): + return + case <-lazyTimer.C: + // No transactions available, create empty block + if err := m.publishEmptyBlock(ctx); err != nil { + m.logger.Error("error publishing empty block", "error", err) + } + case <-blockTimer.C: + // Normal block production with available transactions + if err := m.publishBlock(ctx); err != nil { + m.logger.Error("error publishing block", "error", err) + } + } + } + } + + func (m *Manager) publishEmptyBlock(ctx context.Context) error { + return m.publishBlock(ctx, EmptyBlockSentinel) + } + ``` + +3. **Transaction Processing**: + + ```go + func (m *Manager) executeTxs(txs [][]byte) ([][]byte, error) { + if m.isEmptyBlock(txs) { + // Skip execution but increment height + return nil, nil + } + // Normal transaction execution + return m.app.ExecuteTxs(txs) + } + ``` + +### Efficiency Considerations + +- Minimal DA layer space usage for empty blocks (3 bytes) +- No additional computational overhead in normal operation +- Maintains existing timing characteristics + +### Security Considerations + +- Sentinel value must be unique and unlikely to appear in normal transactions +- Empty block detection must be consistent across all nodes +- No additional attack surface introduced + +## Status + +Proposed + +## Consequences + +### Positive + +- Maintains consistent block heights between DA and execution layers +- Optimizes DA layer space usage for empty blocks +- Preserves existing lazy aggregation benefits +- Supports EVM execution layer requirements + +### Negative + +- Small additional DA layer cost for empty blocks +- Minor increase in implementation complexity + +### Neutral + +- Requires coordination between nodes to recognize sentinel values +- May require updates to block explorers and tooling + +## References + +- [Current Aggregation Implementation](block/aggregation.go) +- [ADR-013: Single Sequencer](specs/lazy-adr/adr-013-single-sequencer.md) From ed5dce13ded94a351bb524b99810433ef4a3e9b6 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 30 Apr 2025 09:40:53 +0200 Subject: [PATCH 02/21] ammend based on feedback --- specs/lazy-adr/adr-021-lazy-aggregation.md | 178 +++++++++------------ 1 file changed, 75 insertions(+), 103 deletions(-) diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index f37053907b..cacc3724b6 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -3,133 +3,106 @@ ## Changelog - 2024-01-24: Initial draft +- 2024-01-24: Revised to use existing empty batch mechanism ## Context Rollkit's lazy aggregation mechanism currently produces blocks at set intervals when no transactions are present, and immediately when transactions are available. However, this approach creates inconsistency with the DA layer (Celestia) as empty blocks are not posted to the DA layer. This breaks the expected 1:1 mapping between DA layer blocks and execution layer blocks in EVM environments. -The current implementation in `block/aggregation.go` needs to be enhanced to maintain consistency between DA layer blocks and execution layer blocks, even during periods of inactivity. - -## Alternative Approaches - -### 1. Skip Empty Blocks Entirely - -- Continue current behavior of not posting empty blocks -- **Rejected** because it breaks EVM execution layer expectations and creates inconsistent block mapping - -### 2. Post Full Block Structure for Empty Blocks - -- Post regular blocks with empty transaction lists -- **Rejected** because it wastes DA layer space unnecessarily - -### 3. Sentinel Empty Block Marker (Chosen) - -- Post a special single-byte transaction to mark empty blocks -- Optimize DA layer space usage while maintaining block consistency -- Provides clear distinction between empty and failed/missed blocks - ## Decision -Implement a sentinel-based empty block marking system that: - -1. Posts a special sentinel transaction to the DA layer for empty blocks -2. Maintains block height consistency between DA and execution layers -3. Optimizes DA layer space usage -4. Preserves existing lazy aggregation timing behavior +Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintain block height consistency. ## Detailed Design -### User Requirements - -- Maintain consistent block heights between DA and execution layers -- Minimize DA layer costs for empty blocks -- Preserve existing lazy aggregation performance characteristics -- Support EVM execution layer expectations - -### Systems Affected - -1. Block Manager (`block/aggregation.go`) -2. Transaction Processing Pipeline -3. DA Layer Interface -4. Block Execution Logic - -### Data Structures - -```go -const ( - // EmptyBlockSentinel is a special byte sequence marking empty blocks - EmptyBlockSentinel = []byte{0x00, 0xE0, 0x0E} // "EMPTY" in minimal form -) - -type BlockData struct { - Txs [][]byte - IsEmpty bool -} -``` - ### Implementation Details -1. **Empty Block Detection**: +1. **Modified Batch Retrieval**: ```go - func (m *Manager) isEmptyBlock(txs [][]byte) bool { - return len(txs) == 1 && bytes.Equal(txs[0], EmptyBlockSentinel) + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { + res, err := m.sequencer.GetBatch(ctx) + if err != nil { + return nil, err + } + + if res != nil && res.Batch != nil { + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing + if len(res.Batch.Transactions) == 0 { + return &BatchData{ + Batch: res.Batch, + Time: res.Timestamp, + Data: res.BatchData, + }, ErrNoBatch + } + return &BatchData{ + Batch: res.Batch, + Time: res.Timestamp, + Data: res.BatchData, + }, nil + } + return nil, ErrNoBatch } ``` -2. **Modified Aggregation Loop**: +2. **Header Creation for Empty Blocks**: ```go - func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *timTimer) { - // ... existing timer setup ... - - for { - select { - case <-ctx.Done(): - return - case <-lazyTimer.C: - // No transactions available, create empty block - if err := m.publishEmptyBlock(ctx); err != nil { - m.logger.Error("error publishing empty block", "error", err) - } - case <-blockTimer.C: - // Normal block production with available transactions - if err := m.publishBlock(ctx); err != nil { - m.logger.Error("error publishing block", "error", err) - } - } + func (m *Manager) createHeader(ctx context.Context, batchData *BatchData) (*types.Header, error) { + // Use batch timestamp even for empty blocks + timestamp := batchData.Time + + if batchData.Transactions == nil || len(batchData.Transactions) == 0 { + // Use dataHashForEmptyTxs to indicate empty batch + return &types.Header{ + DataHash: dataHashForEmptyTxs, + Timestamp: timestamp, + // ... other header fields + }, nil } - } - - func (m *Manager) publishEmptyBlock(ctx context.Context) error { - return m.publishBlock(ctx, EmptyBlockSentinel) + // Normal header creation for non-empty blocks + // ... } ``` -3. **Transaction Processing**: +3. **Block Syncing**: +The existing sync mechanism already handles empty blocks correctly: ```go - func (m *Manager) executeTxs(txs [][]byte) ([][]byte, error) { - if m.isEmptyBlock(txs) { - // Skip execution but increment height - return nil, nil + func (m *Manager) handleEmptyDataHash(ctx context.Context, header *types.Header) { + // Existing code that handles headers with dataHashForEmptyTxs + // This allows nodes to sync empty blocks without waiting for data + if bytes.Equal(header.DataHash, dataHashForEmptyTxs) { + // ... existing empty block handling } - // Normal transaction execution - return m.app.ExecuteTxs(txs) } ``` +### Key Changes + +1. Return batch with timestamp even when empty, allowing proper block timing +2. Use existing `dataHashForEmptyTxs` for empty block indication +3. Leverage current sync mechanisms that already handle empty blocks +4. No new data structures or special transactions needed + ### Efficiency Considerations -- Minimal DA layer space usage for empty blocks (3 bytes) -- No additional computational overhead in normal operation -- Maintains existing timing characteristics +- Minimal DA layer overhead for empty blocks +- Reuses existing empty block detection mechanism +- Maintains proper block timing using batch timestamps + +### Testing Strategy -### Security Considerations +1. **Unit Tests**: + - Empty batch handling + - Timestamp preservation + - Block height consistency -- Sentinel value must be unique and unlikely to appear in normal transactions -- Empty block detection must be consistent across all nodes -- No additional attack surface introduced +2. **Integration Tests**: + - DA layer block mapping + - Empty block syncing + - Block timing verification ## Status @@ -140,21 +113,20 @@ Proposed ### Positive - Maintains consistent block heights between DA and execution layers -- Optimizes DA layer space usage for empty blocks -- Preserves existing lazy aggregation benefits -- Supports EVM execution layer requirements +- Leverages existing empty block mechanisms +- Simpler implementation than sentinel-based approach +- Preserves proper block timing ### Negative -- Small additional DA layer cost for empty blocks -- Minor increase in implementation complexity +- Small DA layer overhead for empty blocks ### Neutral -- Requires coordination between nodes to recognize sentinel values -- May require updates to block explorers and tooling +- Requires careful handling of batch timestamps ## References -- [Current Aggregation Implementation](block/aggregation.go) -- [ADR-013: Single Sequencer](specs/lazy-adr/adr-013-single-sequencer.md) +- [Block Manager Implementation](block/manager.go) +- [Block Sync Implementation](block/sync.go) +- [Existing Empty Block Handling](block/sync.go#L170) From e7472d70bbb2861bef23e5e88380482a6b7a03e6 Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 30 Apr 2025 21:54:43 +0200 Subject: [PATCH 03/21] Update specs/lazy-adr/adr-021-lazy-aggregation.md Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com> --- specs/lazy-adr/adr-021-lazy-aggregation.md | 47 +++++++++++----------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index cacc3724b6..ea4fd0f3ae 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -20,29 +20,30 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 1. **Modified Batch Retrieval**: ```go - func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { - res, err := m.sequencer.GetBatch(ctx) - if err != nil { - return nil, err - } - - if res != nil && res.Batch != nil { - // Even if there are no transactions, return the batch with timestamp - // This allows empty blocks to maintain proper timing - if len(res.Batch.Transactions) == 0 { - return &BatchData{ - Batch: res.Batch, - Time: res.Timestamp, - Data: res.BatchData, - }, ErrNoBatch - } - return &BatchData{ - Batch: res.Batch, - Time: res.Timestamp, - Data: res.BatchData, - }, nil - } - return nil, ErrNoBatch +func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { + res, err := m.sequencer.GetBatch(ctx) + if err != nil { + return nil, err + } + + if res != nil && res.Batch != nil { + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing + if len(res.Batch.Transactions) == 0 { + return &BatchData{ + Batch: res.Batch, + Time: res.Timestamp, + Data: res.BatchData, + }, ErrNoBatch + } + return &BatchData{ + Batch: res.Batch, + Time: res.Timestamp, + Data: res.BatchData, + }, nil + } + return nil, ErrNoBatch +} } ``` From cb5fb906c09ac110d61fe7d90f9b0049dd95285e Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 17:15:21 +0200 Subject: [PATCH 04/21] lint fix --- specs/lazy-adr/adr-007-header-commit-to-shares.md | 3 --- specs/lazy-adr/adr-021-lazy-aggregation.md | 9 +++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/specs/lazy-adr/adr-007-header-commit-to-shares.md b/specs/lazy-adr/adr-007-header-commit-to-shares.md index 283fece0ae..acc5d08cde 100644 --- a/specs/lazy-adr/adr-007-header-commit-to-shares.md +++ b/specs/lazy-adr/adr-007-header-commit-to-shares.md @@ -56,13 +56,10 @@ Discarded ### Positive - ### Negative - ### Neutral - ## References 1. diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index ea4fd0f3ae..4ff0192bb2 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -20,11 +20,12 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 1. **Modified Batch Retrieval**: ```go + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { - res, err := m.sequencer.GetBatch(ctx) - if err != nil { - return nil, err - } + res, err := m.sequencer.GetBatch(ctx) + if err != nil { + return nil, err + } if res != nil && res.Batch != nil { // Even if there are no transactions, return the batch with timestamp From 7fb554a9c5a9e3f7722fc196af7f7f8c543871a1 Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 2 May 2025 17:28:15 +0200 Subject: [PATCH 05/21] chore: empty blocks & lazy aggrgation (#2243) ## Overview --------- Co-authored-by: tac0turtle Co-authored-by: Manav Aggarwal --- block/aggregation.go | 44 ++- block/aggregation_test.go | 8 +- block/lazy_aggregation_test.go | 403 +++++++++++++++++++++ block/manager.go | 158 ++++++-- block/publish_block_test.go | 41 ++- block/reaper.go | 12 + node/full.go | 23 +- node/helpers_test.go | 6 +- pkg/cmd/run_node_test.go | 8 +- pkg/config/config.go | 16 +- pkg/config/config_test.go | 14 +- pkg/config/defaults.go | 12 +- specs/lazy-adr/adr-021-lazy-aggregation.md | 177 +++++---- 13 files changed, 777 insertions(+), 145 deletions(-) create mode 100644 block/lazy_aggregation_test.go diff --git a/block/aggregation.go b/block/aggregation.go index 29dcd86610..c26a3d0503 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -36,7 +36,7 @@ func (m *Manager) AggregationLoop(ctx context.Context) { // Lazy Aggregator mode. // In Lazy Aggregator mode, blocks are built only when there are // transactions or every LazyBlockTime. - if m.config.Node.LazyAggregator { + if m.config.Node.LazyMode { m.lazyAggregationLoop(ctx, blockTimer) return } @@ -55,21 +55,37 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time return case <-lazyTimer.C: - case <-blockTimer.C: - } + m.logger.Debug("Lazy timer triggered block production") + m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) - // Reset the start time - start := time.Now() + case <-blockTimer.C: + m.logger.Debug("Block timer triggered block production") + if m.txsAvailable { + m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + } + m.txsAvailable = false - // Attempt to publish the block regardless of activity - if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { - m.logger.Error("error while publishing block", "error", err) + case <-m.txNotifyCh: + m.txsAvailable = true } + } +} + +// produceBlock handles the common logic for producing a block and resetting timers +func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() - // Reset both timers for the next aggregation window - lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockTime.Duration)) - blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + } else { + m.logger.Debug("Successfully published block", "trigger", trigger) } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) } func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Timer) { @@ -87,6 +103,12 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti // Reset the blockTimer to signal the next block production // period based on the block time. blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + + case <-m.txNotifyCh: + // Transaction notifications are intentionally ignored in normal mode + // to avoid triggering block production outside the scheduled intervals. + // We just update the txsAvailable flag for tracking purposes + m.txsAvailable = true } } } diff --git a/block/aggregation_test.go b/block/aggregation_test.go index 7d7548c691..42e94745b6 100644 --- a/block/aggregation_test.go +++ b/block/aggregation_test.go @@ -45,8 +45,8 @@ func TestAggregationLoop_Normal_BasicInterval(t *testing.T) { logger: logger, config: config.Config{ Node: config.NodeConfig{ - BlockTime: config.DurationWrapper{Duration: blockTime}, - LazyAggregator: false, + BlockTime: config.DurationWrapper{Duration: blockTime}, + LazyMode: false, }, DA: config.DAConfig{ BlockTime: config.DurationWrapper{Duration: 1 * time.Second}, @@ -142,8 +142,8 @@ func TestAggregationLoop_Normal_PublishBlockError(t *testing.T) { logger: mockLogger, config: config.Config{ Node: config.NodeConfig{ - BlockTime: config.DurationWrapper{Duration: blockTime}, - LazyAggregator: false, + BlockTime: config.DurationWrapper{Duration: blockTime}, + LazyMode: false, }, DA: config.DAConfig{ BlockTime: config.DurationWrapper{Duration: 1 * time.Second}, diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go new file mode 100644 index 0000000000..3c2ff77985 --- /dev/null +++ b/block/lazy_aggregation_test.go @@ -0,0 +1,403 @@ +package block + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "cosmossdk.io/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/rollkit/rollkit/pkg/config" +) + +// mockPublishBlock is used to control the behavior of publishBlock during tests +type mockPublishBlock struct { + mu sync.Mutex + calls chan struct{} + err error + delay time.Duration // Optional delay to simulate processing time +} + +// reset clears the calls channel in mockPublishBlock. +func (m *mockPublishBlock) reset() { + m.mu.Lock() + defer m.mu.Unlock() + // Clear the channel + for len(m.calls) > 0 { + <-m.calls + } +} + +func (m *mockPublishBlock) publish(ctx context.Context) error { + m.mu.Lock() + err := m.err + delay := m.delay + m.mu.Unlock() + + if delay > 0 { + time.Sleep(delay) + } + // Non-blocking send in case the channel buffer is full or receiver is not ready + select { + case m.calls <- struct{}{}: + default: + } + return err +} + +func setupTestManager(t *testing.T, blockTime, lazyTime time.Duration) (*Manager, *mockPublishBlock) { + t.Helper() + pubMock := &mockPublishBlock{ + calls: make(chan struct{}, 10), // Buffer to avoid blocking in tests + } + logger := log.NewTestLogger(t) + m := &Manager{ + logger: logger, + config: config.Config{ + Node: config.NodeConfig{ + BlockTime: config.DurationWrapper{Duration: blockTime}, + LazyBlockInterval: config.DurationWrapper{Duration: lazyTime}, + LazyMode: true, // Ensure lazy mode is active + }, + }, + publishBlock: pubMock.publish, + } + return m, pubMock +} + +// TestLazyAggregationLoop_BlockTimerTrigger tests that a block is published when the blockTimer fires first. +func TestLazyAggregationLoop_BlockTimerTrigger(t *testing.T) { + require := require.New(t) + + // Create a mock for the publishBlock function that counts calls + callCount := 0 + mockPublishFn := func(ctx context.Context) error { + callCount++ + return nil + } + + // Setup a manager with our mock publish function + blockTime := 50 * time.Millisecond + lazyTime := 200 * time.Millisecond // Lazy timer fires later + m, _ := setupTestManager(t, blockTime, lazyTime) + m.publishBlock = mockPublishFn + + // Set txsAvailable to true to ensure block timer triggers block production + m.txsAvailable = true + + ctx, cancel := context.WithCancel(context.Background()) + + // Start the lazy aggregation loop + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(0) // Fire immediately first time + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for at least one block to be published + time.Sleep(blockTime * 2) + + // Cancel the context to stop the loop + cancel() + wg.Wait() + + // Verify that at least one block was published + require.GreaterOrEqual(callCount, 1, "Expected at least one block to be published") +} + +// TestLazyAggregationLoop_LazyTimerTrigger tests that a block is published when the lazyTimer fires first. +func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + blockTime := 200 * time.Millisecond // Block timer fires later + lazyTime := 50 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + + // Set txsAvailable to false to ensure lazy timer triggers block production + // and block timer doesn't + m.txsAvailable = false + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Use real timers for this test + blockTimer := time.NewTimer(0) // Fire immediately first time + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first publish call triggered by the initial immediate lazyTimer fire + select { + case <-pubMock.calls: + // Good, first block published by lazy timer + case <-time.After(2 * lazyTime): // Give some buffer + require.Fail("timed out waiting for first block publication") + } + + // Wait for the second publish call, triggered by lazyTimer reset + select { + case <-pubMock.calls: + // Good, second block published by lazyTimer + case <-time.After(2 * lazyTime): // Give some buffer + require.Fail("timed out waiting for second block publication (lazyTimer)") + } + + // Ensure blockTimer didn't trigger a publish yet (since txsAvailable is false) + assert.Len(pubMock.calls, 0, "Expected no more publish calls yet") + + cancel() + wg.Wait() +} + +// TestLazyAggregationLoop_PublishError tests that the loop continues after a publish error. +func TestLazyAggregationLoop_PublishError(t *testing.T) { + require := require.New(t) + + blockTime := 50 * time.Millisecond + lazyTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + + pubMock.mu.Lock() + pubMock.err = errors.New("publish failed") + pubMock.mu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Use real timers + blockTimer := time.NewTimer(0) + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first publish attempt (which will fail) + select { + case <-pubMock.calls: + case <-time.After(2 * blockTime): + require.Fail("timed out waiting for first block publication attempt") + } + + // Remove the error for subsequent calls + pubMock.mu.Lock() + pubMock.err = nil + pubMock.mu.Unlock() + + // Wait for the second publish attempt (should succeed) + // Use a longer timeout since we need to wait for either the lazy timer or block timer to fire + select { + case <-pubMock.calls: + case <-time.After(2 * lazyTime): // Use the longer of the two timers with some buffer + require.Fail("timed out waiting for second block publication attempt after error") + } + + cancel() + wg.Wait() +} + +// TestGetRemainingSleep tests the calculation of sleep duration. +func TestGetRemainingSleep(t *testing.T) { + assert := assert.New(t) + + interval := 100 * time.Millisecond + + // Case 1: Elapsed time is less than interval + start1 := time.Now().Add(-30 * time.Millisecond) // Started 30ms ago + sleep1 := getRemainingSleep(start1, interval) + // Expecting interval - elapsed = 100ms - 30ms = 70ms (allow for slight variations) + assert.InDelta(interval-30*time.Millisecond, sleep1, float64(5*time.Millisecond), "Case 1 failed") + + // Case 2: Elapsed time is greater than or equal to interval + start2 := time.Now().Add(-120 * time.Millisecond) // Started 120ms ago + sleep2 := getRemainingSleep(start2, interval) + // Expecting minimum sleep time + assert.Equal(time.Millisecond, sleep2, "Case 2 failed") + + // Case 3: Elapsed time is exactly the interval + start3 := time.Now().Add(-100 * time.Millisecond) // Started 100ms ago + sleep3 := getRemainingSleep(start3, interval) + // Expecting minimum sleep time + assert.Equal(time.Millisecond, sleep3, "Case 3 failed") + + // Case 4: Zero elapsed time + start4 := time.Now() + sleep4 := getRemainingSleep(start4, interval) + assert.InDelta(interval, sleep4, float64(5*time.Millisecond), "Case 4 failed") +} + +// TestLazyAggregationLoop_TxNotification tests that transaction notifications trigger block production in lazy mode +func TestLazyAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 200 * time.Millisecond + lazyTime := 500 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + m.config.Node.LazyMode = true + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Start with a timer that won't fire immediately + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait for the initial lazy timer to fire and publish a block + select { + case <-pubMock.calls: + // Initial block was published by lazy timer + case <-time.After(100 * time.Millisecond): + require.Fail("Initial block was not published") + } + + // Reset the mock to track new calls + pubMock.reset() + + // Wait a bit to ensure the loop is running with reset timers + time.Sleep(20 * time.Millisecond) + + // Send a transaction notification + m.NotifyNewTransactions() + + // Wait for the block timer to fire and check txsAvailable + select { + case <-pubMock.calls: + // Block was published, which is what we expect + case <-time.After(blockTime + 50*time.Millisecond): + require.Fail("Block was not published after transaction notification") + } + + // Reset the mock again + pubMock.reset() + + // Send another notification immediately + m.NotifyNewTransactions() + + // Wait for the next block timer to fire + select { + case <-pubMock.calls: + // Block was published after notification + case <-time.After(blockTime + 50*time.Millisecond): + require.Fail("Block was not published after second notification") + } + + cancel() + wg.Wait() +} + +// TestEmptyBlockCreation tests that empty blocks are created with the correct dataHash +func TestEmptyBlockCreation(t *testing.T) { + require := require.New(t) + + // Create a mock for the publishBlock function that captures the context + var capturedCtx context.Context + mockPublishFn := func(ctx context.Context) error { + capturedCtx = ctx + return nil + } + + // Setup a manager with our mock publish function + blockTime := 50 * time.Millisecond + lazyTime := 100 * time.Millisecond + m, _ := setupTestManager(t, blockTime, lazyTime) + m.publishBlock = mockPublishFn + + // Create a context we can cancel + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create timers for the test + lazyTimer := time.NewTimer(lazyTime) + blockTimer := time.NewTimer(blockTime) + defer lazyTimer.Stop() + defer blockTimer.Stop() + + // Call produceBlock directly to test empty block creation + m.produceBlock(ctx, "test_trigger", lazyTimer, blockTimer) + + // Verify that the context was passed correctly + require.NotNil(capturedCtx, "Context should have been captured by mock publish function") + require.Equal(ctx, capturedCtx, "Context should match the one passed to produceBlock") +} + +// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode +func TestNormalAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, 0) + m.config.Node.LazyMode = false + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.normalAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first block to be published by the timer + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer") + } + + // Reset the publish mock to track new calls + pubMock.reset() + + // Send a transaction notification + m.NotifyNewTransactions() + + // In normal mode, the notification should not trigger an immediate block + select { + case <-pubMock.calls: + // If we enable the optional enhancement to reset the timer, this might happen + // But with the current implementation, this should not happen + require.Fail("Block was published immediately after notification in normal mode") + case <-time.After(blockTime / 2): + // This is expected - no immediate block + } + + // Wait for the next regular block + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer after notification") + } + + cancel() + wg.Wait() +} diff --git a/block/manager.go b/block/manager.go index a6d96183ca..c3ebad7c6c 100644 --- a/block/manager.go +++ b/block/manager.go @@ -130,7 +130,7 @@ type Manager struct { logger log.Logger // For usage by Lazy Aggregator mode - buildingBlock bool + txsAvailable bool pendingHeaders *PendingHeaders @@ -152,6 +152,9 @@ type Manager struct { // publishBlock is the function used to publish blocks. It defaults to // the manager's publishBlock method but can be overridden for testing. publishBlock publishBlockFunc + + // txNotifyCh is used to signal when new transactions are available + txNotifyCh chan struct{} } // getInitialState tries to load lastState from Store, and if it's not available it reads genesis. @@ -285,9 +288,9 @@ func NewManager( config.Node.BlockTime.Duration = defaultBlockTime } - if config.Node.LazyBlockTime.Duration == 0 { + if config.Node.LazyBlockInterval.Duration == 0 { logger.Info("Using default lazy block time", "LazyBlockTime", defaultLazyBlockTime) - config.Node.LazyBlockTime.Duration = defaultLazyBlockTime + config.Node.LazyBlockInterval.Duration = defaultLazyBlockTime } if config.DA.MempoolTTL == 0 { @@ -346,13 +349,14 @@ func NewManager( dataCache: cache.NewCache[types.Data](), retrieveCh: make(chan struct{}, 1), logger: logger, - buildingBlock: false, + txsAvailable: false, pendingHeaders: pendingHeaders, metrics: seqMetrics, sequencer: sequencer, exec: exec, gasPrice: gasPrice, gasMultiplier: gasMultiplier, + txNotifyCh: make(chan struct{}, 1), // Non-blocking channel } agg.init(ctx) // Set the default publishBlock implementation @@ -463,8 +467,14 @@ func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { "txCount", len(res.Batch.Transactions), "timestamp", res.Timestamp) + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing if len(res.Batch.Transactions) == 0 { - return nil, ErrNoBatch + return &BatchData{ + Batch: res.Batch, + Time: res.Timestamp, + Data: res.BatchData, + }, ErrNoBatch } h := convertBatchDataToBytes(res.BatchData) if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { @@ -501,10 +511,12 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { lastHeaderTime time.Time err error ) + height, err := m.store.Height(ctx) if err != nil { - return fmt.Errorf("error while getting store height: %w", err) + return fmt.Errorf("error while getting height: %w", err) } + newHeight := height + 1 // this is a special case, when first block is produced - there is no previous commit if newHeight <= m.genesis.InitialHeight { @@ -540,26 +552,44 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { } else { batchData, err := m.retrieveBatch(ctx) if errors.Is(err, ErrNoBatch) { - m.logger.Info("No batch retrieved from sequencer, skipping block production") - return nil // Indicate no block was produced + // Even with no transactions, we still want to create a block with an empty batch + if batchData != nil { + m.logger.Info("Creating empty block", "height", newHeight) + + // Create an empty block + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + err = m.store.SaveBlockData(ctx, header, data, &signature) + if err != nil { + return SaveBlockError{err} + } + } else { + // If we don't have a batch at all (not even an empty one), skip block production + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } } else if err != nil { return fmt.Errorf("failed to get transactions from batch: %w", err) - } - - // sanity check timestamp for monotonically increasing - if batchData.Before(lastHeaderTime) { - return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) - } - m.logger.Info("Creating and publishing block", "height", newHeight) - header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) - if err != nil { - return err - } - m.logger.Debug("block info", "num_tx", len(data.Txs)) + } else { + // We have a batch with transactions + // sanity check timestamp for monotonically increasing + if batchData.Before(lastHeaderTime) { + return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) + } + m.logger.Info("Creating and publishing block", "height", newHeight) + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + m.logger.Debug("block info", "num_tx", len(data.Txs)) - err = m.store.SaveBlockData(ctx, header, data, &signature) - if err != nil { - return SaveBlockError{err} + err = m.store.SaveBlockData(ctx, header, data, &signature) + if err != nil { + return SaveBlockError{err} + } } } @@ -679,7 +709,74 @@ func (m *Manager) getLastBlockTime() time.Time { func (m *Manager) createBlock(ctx context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { m.lastStateMtx.RLock() defer m.lastStateMtx.RUnlock() - return m.execCreateBlock(ctx, height, lastSignature, lastHeaderHash, m.lastState, batchData) + + if m.signer == nil { + return nil, nil, fmt.Errorf("signer is nil; cannot create block") + } + + key, err := m.signer.GetPublic() + if err != nil { + return nil, nil, fmt.Errorf("failed to get proposer public key: %w", err) + } + + // check that the proposer address is the same as the genesis proposer address + address, err := m.signer.GetAddress() + if err != nil { + return nil, nil, fmt.Errorf("failed to get proposer address: %w", err) + } + if !bytes.Equal(m.genesis.ProposerAddress, address) { + return nil, nil, fmt.Errorf("proposer address is not the same as the genesis proposer address %x != %x", address, m.genesis.ProposerAddress) + } + + // Determine if this is an empty block + isEmpty := batchData.Batch == nil || len(batchData.Batch.Transactions) == 0 + + // Set the appropriate data hash based on whether this is an empty block + var dataHash types.Hash + if isEmpty { + dataHash = dataHashForEmptyTxs // Use dataHashForEmptyTxs for empty blocks + } else { + dataHash = convertBatchDataToBytes(batchData.Data) + } + + header := &types.SignedHeader{ + Header: types.Header{ + Version: types.Version{ + Block: m.lastState.Version.Block, + App: m.lastState.Version.App, + }, + BaseHeader: types.BaseHeader{ + ChainID: m.lastState.ChainID, + Height: height, + Time: uint64(batchData.UnixNano()), //nolint:gosec // why is time unix? (tac0turtle) + }, + LastHeaderHash: lastHeaderHash, + DataHash: dataHash, + ConsensusHash: make(types.Hash, 32), + AppHash: m.lastState.AppHash, + ProposerAddress: m.genesis.ProposerAddress, + }, + Signature: *lastSignature, + Signer: types.Signer{ + PubKey: key, + Address: m.genesis.ProposerAddress, + }, + } + + // Create block data with appropriate transactions + blockData := &types.Data{ + Txs: make(types.Txs, 0), // Start with empty transaction list + } + + // Only add transactions if this is not an empty block + if !isEmpty { + blockData.Txs = make(types.Txs, len(batchData.Batch.Transactions)) + for i := range batchData.Batch.Transactions { + blockData.Txs[i] = types.Tx(batchData.Batch.Transactions[i]) + } + } + + return header, blockData, nil } func (m *Manager) applyBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) (types.State, error) { @@ -864,3 +961,16 @@ func (m *Manager) getSignature(header types.Header) (types.Signature, error) { } return m.signer.Sign(b) } + +// NotifyNewTransactions signals that new transactions are available for processing +// This method will be called by the Reaper when it receives new transactions +func (m *Manager) NotifyNewTransactions() { + // Non-blocking send to avoid slowing down the transaction submission path + select { + case m.txNotifyCh <- struct{}{}: + // Successfully sent notification + default: + // Channel buffer is full, which means a notification is already pending + // This is fine, as we just need to trigger one block production + } +} diff --git a/block/publish_block_test.go b/block/publish_block_test.go index f3440250c5..47608f020c 100644 --- a/block/publish_block_test.go +++ b/block/publish_block_test.go @@ -227,6 +227,9 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { noopSigner, err := noopsigner.NewNoopSigner(privKey) require.NoError(err) + daH := atomic.Uint64{} + daH.Store(0) + m := &Manager{ store: mockStore, sequencer: mockSeq, @@ -245,6 +248,18 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { }, lastStateMtx: &sync.RWMutex{}, metrics: NopMetrics(), + lastState: types.State{ + ChainID: chainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastBlockTime: time.Now(), + AppHash: []byte("initialAppHash"), + }, + headerCache: cache.NewCache[types.SignedHeader](), + dataCache: cache.NewCache[types.Data](), + HeaderCh: make(chan *types.SignedHeader, 1), + DataCh: make(chan *types.Data, 1), + daHeight: &daH, } m.publishBlock = m.publishBlockInternal @@ -279,19 +294,31 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { }) mockSeq.On("GetNextBatch", ctx, batchReqMatcher).Return(emptyBatchResponse, nil).Once() + // With our new implementation, we should expect SaveBlockData to be called for empty blocks + mockStore.On("SaveBlockData", ctx, mock.AnythingOfType("*types.SignedHeader"), mock.AnythingOfType("*types.Data"), mock.AnythingOfType("*types.Signature")).Return(nil).Once() + + // We should also expect ExecuteTxs to be called with an empty transaction list + newAppHash := []byte("newAppHash") + mockExec.On("ExecuteTxs", ctx, mock.Anything, currentHeight+1, mock.AnythingOfType("time.Time"), m.lastState.AppHash).Return(newAppHash, uint64(100), nil).Once() + + // SetFinal should be called + mockExec.On("SetFinal", ctx, currentHeight+1).Return(nil).Once() + + // SetHeight should be called + mockStore.On("SetHeight", ctx, currentHeight+1).Return(nil).Once() + + // UpdateState should be called + mockStore.On("UpdateState", ctx, mock.AnythingOfType("types.State")).Return(nil).Once() + + // SaveBlockData should be called again after validation + mockStore.On("SaveBlockData", ctx, mock.AnythingOfType("*types.SignedHeader"), mock.AnythingOfType("*types.Data"), mock.AnythingOfType("*types.Signature")).Return(nil).Once() + // Call publishBlock err = m.publishBlock(ctx) // Assertions require.NoError(err, "publishBlock should return nil error when the batch is empty") - // Verify mocks: Ensure methods after the check were NOT called - mockStore.AssertNotCalled(t, "SaveBlockData", mock.Anything, mock.Anything, mock.Anything, mock.Anything) - mockExec.AssertNotCalled(t, "ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - mockExec.AssertNotCalled(t, "SetFinal", mock.Anything, mock.Anything) - mockStore.AssertNotCalled(t, "SetHeight", mock.Anything, mock.Anything) - mockStore.AssertNotCalled(t, "UpdateState", mock.Anything, mock.Anything) - mockSeq.AssertExpectations(t) mockStore.AssertExpectations(t) mockExec.AssertExpectations(t) diff --git a/block/reaper.go b/block/reaper.go index a517747bf0..5c18c1cd29 100644 --- a/block/reaper.go +++ b/block/reaper.go @@ -27,6 +27,7 @@ type Reaper struct { logger log.Logger ctx context.Context seenStore ds.Batching + manager *Manager } // NewReaper creates a new Reaper instance with persistent seenTx storage. @@ -45,6 +46,11 @@ func NewReaper(ctx context.Context, exec coreexecutor.Executor, sequencer corese } } +// SetManager sets the Manager reference for transaction notifications +func (r *Reaper) SetManager(manager *Manager) { + r.manager = manager +} + // Start begins the reaping process at the specified interval. func (r *Reaper) Start(ctx context.Context) { r.ctx = ctx @@ -110,6 +116,12 @@ func (r *Reaper) SubmitTxs() { } } + // Notify the manager that new transactions are available + if r.manager != nil && len(newTxs) > 0 { + r.logger.Debug("Notifying manager of new transactions") + r.manager.NotifyNewTransactions() + } + r.logger.Debug("Reaper successfully submitted txs") } diff --git a/node/full.go b/node/full.go index 2609630f7f..98dd0f79a5 100644 --- a/node/full.go +++ b/node/full.go @@ -96,16 +96,6 @@ func newFullNode( store := store.New(mainKV) - reaper := block.NewReaper( - ctx, - exec, - sequencer, - genesis.ChainID, - nodeConfig.Node.BlockTime.Duration, - logger.With("module", "Reaper"), - mainKV, - ) - blockManager, err := initBlockManager( ctx, signer, @@ -126,6 +116,19 @@ func newFullNode( return nil, err } + reaper := block.NewReaper( + ctx, + exec, + sequencer, + genesis.ChainID, + nodeConfig.Node.BlockTime.Duration, + logger.With("module", "Reaper"), + mainKV, + ) + + // Connect the reaper to the manager for transaction notifications + reaper.SetManager(blockManager) + node := &FullNode{ genesis: genesis, nodeConfig: nodeConfig, diff --git a/node/helpers_test.go b/node/helpers_test.go index f334db00dc..226fb8e229 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -22,9 +22,9 @@ func getTestConfig(t *testing.T, n int) rollkitconfig.Config { return rollkitconfig.Config{ RootDir: t.TempDir(), Node: rollkitconfig.NodeConfig{ - Aggregator: true, - BlockTime: rollkitconfig.DurationWrapper{Duration: 500 * time.Millisecond}, - LazyBlockTime: rollkitconfig.DurationWrapper{Duration: 5 * time.Second}, + Aggregator: true, + BlockTime: rollkitconfig.DurationWrapper{Duration: 500 * time.Millisecond}, + LazyBlockInterval: rollkitconfig.DurationWrapper{Duration: 5 * time.Second}, }, DA: rollkitconfig.DAConfig{ Address: MockDAAddress, diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index 646387cb1b..b4e6e5ef43 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -62,8 +62,8 @@ func TestParseFlags(t *testing.T) { "--rollkit.da.mempool_ttl", "10", "--rollkit.da.namespace", "namespace", "--rollkit.da.start_height", "100", - "--rollkit.node.lazy_aggregator", - "--rollkit.node.lazy_block_time", "2m", + "--rollkit.node.lazy_mode", + "--rollkit.node.lazy_block_interval", "2m", "--rollkit.node.light", "--rollkit.node.max_pending_blocks", "100", "--rollkit.node.trusted_hash", "abcdef1234567890", @@ -122,8 +122,8 @@ func TestParseFlags(t *testing.T) { {"DAMempoolTTL", nodeConfig.DA.MempoolTTL, uint64(10)}, {"DANamespace", nodeConfig.DA.Namespace, "namespace"}, {"DAStartHeight", nodeConfig.DA.StartHeight, uint64(100)}, - {"LazyAggregator", nodeConfig.Node.LazyAggregator, true}, - {"LazyBlockTime", nodeConfig.Node.LazyBlockTime.Duration, 2 * time.Minute}, + {"LazyAggregator", nodeConfig.Node.LazyMode, true}, + {"LazyBlockTime", nodeConfig.Node.LazyBlockInterval.Duration, 2 * time.Minute}, {"Light", nodeConfig.Node.Light, true}, {"MaxPendingBlocks", nodeConfig.Node.MaxPendingBlocks, uint64(100)}, {"TrustedHash", nodeConfig.Node.TrustedHash, "abcdef1234567890"}, diff --git a/pkg/config/config.go b/pkg/config/config.go index ee1a34bf08..7394222967 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -37,11 +37,11 @@ const ( // FlagTrustedHash is a flag for specifying the trusted hash FlagTrustedHash = "rollkit.node.trusted_hash" // FlagLazyAggregator is a flag for enabling lazy aggregation mode that only produces blocks when transactions are available - FlagLazyAggregator = "rollkit.node.lazy_aggregator" + FlagLazyAggregator = "rollkit.node.lazy_mode" // FlagMaxPendingBlocks is a flag to limit and pause block production when too many blocks are waiting for DA confirmation FlagMaxPendingBlocks = "rollkit.node.max_pending_blocks" // FlagLazyBlockTime is a flag for specifying the maximum interval between blocks in lazy aggregation mode - FlagLazyBlockTime = "rollkit.node.lazy_block_time" + FlagLazyBlockTime = "rollkit.node.lazy_block_interval" // Data Availability configuration flags @@ -162,10 +162,10 @@ type NodeConfig struct { Light bool `yaml:"light" comment:"Run node in light mode"` // Block management configuration - BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` - MaxPendingBlocks uint64 `mapstructure:"max_pending_blocks" yaml:"max_pending_blocks" comment:"Maximum number of blocks pending DA submission. When this limit is reached, the aggregator pauses block production until some blocks are confirmed. Use 0 for no limit."` - LazyAggregator bool `mapstructure:"lazy_aggregator" yaml:"lazy_aggregator" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."` - LazyBlockTime DurationWrapper `mapstructure:"lazy_block_time" yaml:"lazy_block_time" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."` + BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` + MaxPendingBlocks uint64 `mapstructure:"max_pending_blocks" yaml:"max_pending_blocks" comment:"Maximum number of blocks pending DA submission. When this limit is reached, the aggregator pauses block production until some blocks are confirmed. Use 0 for no limit."` + LazyMode bool `mapstructure:"lazy_mode" yaml:"lazy_mode" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."` + LazyBlockInterval DurationWrapper `mapstructure:"lazy_block_interval" yaml:"lazy_block_interval" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."` // Header configuration TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."` @@ -240,9 +240,9 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Bool(FlagLight, def.Node.Light, "run light client") cmd.Flags().Duration(FlagBlockTime, def.Node.BlockTime.Duration, "block time (for aggregator mode)") cmd.Flags().String(FlagTrustedHash, def.Node.TrustedHash, "initial trusted hash to start the header exchange service") - cmd.Flags().Bool(FlagLazyAggregator, def.Node.LazyAggregator, "produce blocks only when transactions are available or after lazy block time") + cmd.Flags().Bool(FlagLazyAggregator, def.Node.LazyMode, "produce blocks only when transactions are available or after lazy block time") cmd.Flags().Uint64(FlagMaxPendingBlocks, def.Node.MaxPendingBlocks, "maximum blocks pending DA confirmation before pausing block production (0 for no limit)") - cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockTime.Duration, "maximum interval between blocks in lazy aggregation mode") + cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockInterval.Duration, "maximum interval between blocks in lazy aggregation mode") // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7974a1f0a6..dbd870c890 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -31,8 +31,8 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, uint64(0), def.DA.StartHeight) assert.Equal(t, uint64(0), def.DA.MempoolTTL) assert.Equal(t, uint64(0), def.Node.MaxPendingBlocks) - assert.Equal(t, false, def.Node.LazyAggregator) - assert.Equal(t, 60*time.Second, def.Node.LazyBlockTime.Duration) + assert.Equal(t, false, def.Node.LazyMode) + assert.Equal(t, 60*time.Second, def.Node.LazyBlockInterval.Duration) assert.Equal(t, "", def.Node.TrustedHash) assert.Equal(t, "file", def.Signer.SignerType) assert.Equal(t, "config", def.Signer.SignerPath) @@ -57,9 +57,9 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagLight, DefaultConfig.Node.Light) assertFlagValue(t, flags, FlagBlockTime, DefaultConfig.Node.BlockTime.Duration) assertFlagValue(t, flags, FlagTrustedHash, DefaultConfig.Node.TrustedHash) - assertFlagValue(t, flags, FlagLazyAggregator, DefaultConfig.Node.LazyAggregator) + assertFlagValue(t, flags, FlagLazyAggregator, DefaultConfig.Node.LazyMode) assertFlagValue(t, flags, FlagMaxPendingBlocks, DefaultConfig.Node.MaxPendingBlocks) - assertFlagValue(t, flags, FlagLazyBlockTime, DefaultConfig.Node.LazyBlockTime.Duration) + assertFlagValue(t, flags, FlagLazyBlockTime, DefaultConfig.Node.LazyBlockInterval.Duration) // DA flags assertFlagValue(t, flags, FlagDAAddress, DefaultConfig.DA.Address) @@ -232,7 +232,7 @@ signer: cmd.SetArgs([]string{ "--home=" + tempDir, "--rollkit.da.gas_price=0.5", - "--rollkit.node.lazy_aggregator=true", + "--rollkit.node.lazy_mode=true", }) err = cmd.Execute() require.NoError(t, err) @@ -245,7 +245,7 @@ signer: v := viper.New() v.Set(FlagRootDir, tempDir) v.Set("rollkit.da.gas_price", "0.5") - v.Set("rollkit.node.lazy_aggregator", true) + v.Set("rollkit.node.lazy_mode", true) // Load configuration using the new LoadFromViper method cfgFromViper, err := LoadFromViper(v) @@ -254,7 +254,7 @@ signer: // Compare the results - they should be identical require.Equal(t, cfgFromLoad.RootDir, cfgFromViper.RootDir, "RootDir should match") require.Equal(t, cfgFromLoad.DA.GasPrice, cfgFromViper.DA.GasPrice, "DA.GasPrice should match") - require.Equal(t, cfgFromLoad.Node.LazyAggregator, cfgFromViper.Node.LazyAggregator, "Node.LazyAggregator should match") + require.Equal(t, cfgFromLoad.Node.LazyMode, cfgFromViper.Node.LazyMode, "Node.LazyAggregator should match") require.Equal(t, cfgFromLoad.Node.Aggregator, cfgFromViper.Node.Aggregator, "Node.Aggregator should match") require.Equal(t, cfgFromLoad.Node.BlockTime, cfgFromViper.Node.BlockTime, "Node.BlockTime should match") require.Equal(t, cfgFromLoad.DA.Address, cfgFromViper.DA.Address, "DA.Address should match") diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index b3d52cd521..7e00021b0b 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -45,12 +45,12 @@ var DefaultConfig = Config{ Peers: "", }, Node: NodeConfig{ - Aggregator: false, - BlockTime: DurationWrapper{1 * time.Second}, - LazyAggregator: false, - LazyBlockTime: DurationWrapper{60 * time.Second}, - Light: false, - TrustedHash: "", + Aggregator: false, + BlockTime: DurationWrapper{1 * time.Second}, + LazyMode: false, + LazyBlockInterval: DurationWrapper{60 * time.Second}, + Light: false, + TrustedHash: "", }, DA: DAConfig{ Address: "http://localhost:7980", diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index 4ff0192bb2..b41fc21feb 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -4,6 +4,7 @@ - 2024-01-24: Initial draft - 2024-01-24: Revised to use existing empty batch mechanism +- 2024-01-25: Updated with implementation details from aggregation.go ## Context @@ -19,65 +20,122 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 1. **Modified Batch Retrieval**: + The batch retrieval mechanism has been modified to handle empty batches differently. Instead of discarding empty batches, we now return them with the ErrNoBatch error, allowing the caller to create empty blocks with proper timestamps. This ensures that block timing remains consistent even during periods of inactivity. + ```go + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { + res, err := m.sequencer.GetNextBatch(ctx, req) + if err != nil { + return nil, err + } + + if res != nil && res.Batch != nil { + m.logger.Debug("Retrieved batch", + "txCount", len(res.Batch.Transactions), + "timestamp", res.Timestamp) + + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing + if len(res.Batch.Transactions) == 0 { + return &BatchData{ + Batch: res.Batch, + Time: res.Timestamp, + Data: res.BatchData, + }, ErrNoBatch + } + h := convertBatchDataToBytes(res.BatchData) + if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { + m.logger.Error("error while setting last batch hash", "error", err) + } + m.lastBatchData = res.BatchData + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, nil + } + return nil, ErrNoBatch + } + ``` + +2. **Empty Block Creation**: + + The block publishing logic has been enhanced to create empty blocks when a batch with no transactions is received. This uses the special `dataHashForEmptyTxs` value to indicate an empty batch, maintaining the block height consistency with the DA layer while minimizing overhead. -func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { - res, err := m.sequencer.GetBatch(ctx) - if err != nil { - return nil, err - } - - if res != nil && res.Batch != nil { - // Even if there are no transactions, return the batch with timestamp - // This allows empty blocks to maintain proper timing - if len(res.Batch.Transactions) == 0 { - return &BatchData{ - Batch: res.Batch, - Time: res.Timestamp, - Data: res.BatchData, - }, ErrNoBatch - } - return &BatchData{ - Batch: res.Batch, - Time: res.Timestamp, - Data: res.BatchData, - }, nil - } - return nil, ErrNoBatch -} + ```go + // In publishBlock method + batchData, err := m.retrieveBatch(ctx) + if errors.Is(err, ErrNoBatch) { + // Even with no transactions, we still want to create a block with an empty batch + if batchData != nil { + m.logger.Info("Creating empty block", "height", newHeight) + + // For empty blocks, use dataHashForEmptyTxs to indicate empty batch + header, data, err = m.createEmptyBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + err = m.store.SaveBlockData(ctx, header, data, &signature) + if err != nil { + return SaveBlockError{err} + } + } else { + // If we don't have a batch at all (not even an empty one), skip block production + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } } ``` -2. **Header Creation for Empty Blocks**: +3. **Lazy Aggregation Loop**: + + A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. This approach provides deterministic block production while optimizing for transaction inclusion latency. ```go - func (m *Manager) createHeader(ctx context.Context, batchData *BatchData) (*types.Header, error) { - // Use batch timestamp even for empty blocks - timestamp := batchData.Time - - if batchData.Transactions == nil || len(batchData.Transactions) == 0 { - // Use dataHashForEmptyTxs to indicate empty batch - return &types.Header{ - DataHash: dataHashForEmptyTxs, - Timestamp: timestamp, - // ... other header fields - }, nil + func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { + // lazyTimer triggers block publication even during inactivity + lazyTimer := time.NewTimer(0) + defer lazyTimer.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-lazyTimer.C: + m.logger.Debug("Lazy timer triggered block production") + m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) + + case <-blockTimer.C: + m.logger.Debug("Block timer triggered block production") + if m.txsAvailable { + m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + } + m.txsAvailable = false + + case <-m.txNotifyCh: + m.txsAvailable = true + } } - // Normal header creation for non-empty blocks - // ... } ``` -3. **Block Syncing**: -The existing sync mechanism already handles empty blocks correctly: +4. **Block Production**: + + The block production function centralizes the logic for publishing blocks and resetting timers. It records the start time, attempts to publish a block, and then intelligently resets both timers based on the elapsed time. This ensures that block production remains on schedule even if the block creation process takes significant time. ```go - func (m *Manager) handleEmptyDataHash(ctx context.Context, header *types.Header) { - // Existing code that handles headers with dataHashForEmptyTxs - // This allows nodes to sync empty blocks without waiting for data - if bytes.Equal(header.DataHash, dataHashForEmptyTxs) { - // ... existing empty block handling + func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() + + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + } else { + m.logger.Debug("Successfully published block", "trigger", trigger) } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) } ``` @@ -86,29 +144,22 @@ The existing sync mechanism already handles empty blocks correctly: 1. Return batch with timestamp even when empty, allowing proper block timing 2. Use existing `dataHashForEmptyTxs` for empty block indication 3. Leverage current sync mechanisms that already handle empty blocks -4. No new data structures or special transactions needed +4. Implement a dedicated lazy aggregation loop with two timers: + - `blockTimer`: Triggers block production at regular intervals when transactions are available + - `lazyTimer`: Ensures blocks are produced even during periods of inactivity +5. Maintain transaction availability tracking via the `txsAvailable` flag and notification channel ### Efficiency Considerations - Minimal DA layer overhead for empty blocks - Reuses existing empty block detection mechanism - Maintains proper block timing using batch timestamps - -### Testing Strategy - -1. **Unit Tests**: - - Empty batch handling - - Timestamp preservation - - Block height consistency - -2. **Integration Tests**: - - DA layer block mapping - - Empty block syncing - - Block timing verification +- Intelligent timer management to account for block production time +- Non-blocking transaction notification channel to prevent backpressure ## Status -Proposed +Implemented ## Consequences @@ -118,17 +169,21 @@ Proposed - Leverages existing empty block mechanisms - Simpler implementation than sentinel-based approach - Preserves proper block timing +- Provides deterministic block production even during network inactivity +- Reduces latency for transaction inclusion during active periods ### Negative - Small DA layer overhead for empty blocks +- Additional complexity in timer management ### Neutral - Requires careful handling of batch timestamps +- Maintains backward compatibility with existing Rollkit deployments ## References - [Block Manager Implementation](block/manager.go) -- [Block Sync Implementation](block/sync.go) -- [Existing Empty Block Handling](block/sync.go#L170) +- [Block Aggregation Implementation](block/aggregation.go) +- [Lazy Aggregation Tests](block/lazy_aggregation_test.go) From 4219dd261c97c6e6629ab90571550bf37290fde4 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 17:34:18 +0200 Subject: [PATCH 06/21] remove unused --- block/manager.go | 63 +++--------------------------------------------- 1 file changed, 4 insertions(+), 59 deletions(-) diff --git a/block/manager.go b/block/manager.go index c3ebad7c6c..9e5b33a0ce 100644 --- a/block/manager.go +++ b/block/manager.go @@ -729,7 +729,7 @@ func (m *Manager) createBlock(ctx context.Context, height uint64, lastSignature } // Determine if this is an empty block - isEmpty := batchData.Batch == nil || len(batchData.Batch.Transactions) == 0 + isEmpty := batchData.Batch == nil || len(batchData.Transactions) == 0 // Set the appropriate data hash based on whether this is an empty block var dataHash types.Hash @@ -770,9 +770,9 @@ func (m *Manager) createBlock(ctx context.Context, height uint64, lastSignature // Only add transactions if this is not an empty block if !isEmpty { - blockData.Txs = make(types.Txs, len(batchData.Batch.Transactions)) - for i := range batchData.Batch.Transactions { - blockData.Txs[i] = types.Tx(batchData.Batch.Transactions[i]) + blockData.Txs = make(types.Txs, len(batchData.Transactions)) + for i := range batchData.Transactions { + blockData.Txs[i] = types.Tx(batchData.Transactions[i]) } } @@ -795,61 +795,6 @@ func (m *Manager) execCommit(ctx context.Context, newState types.State, h *types return newState.AppHash, err } -func (m *Manager) execCreateBlock(_ context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, lastState types.State, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { - batchDataIDs := convertBatchDataToBytes(batchData.Data) - - if m.signer == nil { - return nil, nil, fmt.Errorf("signer is nil; cannot create block") - } - - key, err := m.signer.GetPublic() - if err != nil { - return nil, nil, fmt.Errorf("failed to get proposer public key: %w", err) - } - - // check that the proposer address is the same as the genesis proposer address - address, err := m.signer.GetAddress() - if err != nil { - return nil, nil, fmt.Errorf("failed to get proposer address: %w", err) - } - if !bytes.Equal(m.genesis.ProposerAddress, address) { - return nil, nil, fmt.Errorf("proposer address is not the same as the genesis proposer address %x != %x", address, m.genesis.ProposerAddress) - } - - header := &types.SignedHeader{ - Header: types.Header{ - Version: types.Version{ - Block: lastState.Version.Block, - App: lastState.Version.App, - }, - BaseHeader: types.BaseHeader{ - ChainID: lastState.ChainID, - Height: height, - Time: uint64(batchData.UnixNano()), //nolint:gosec // why is time unix? (tac0turtle) - }, - LastHeaderHash: lastHeaderHash, - DataHash: batchDataIDs, - ConsensusHash: make(types.Hash, 32), - AppHash: lastState.AppHash, - ProposerAddress: m.genesis.ProposerAddress, - }, - Signature: *lastSignature, - Signer: types.Signer{ - PubKey: key, - Address: m.genesis.ProposerAddress, - }, - } - - blockData := &types.Data{ - Txs: make(types.Txs, len(batchData.Transactions)), - } - for i := range batchData.Transactions { - blockData.Txs[i] = types.Tx(batchData.Transactions[i]) - } - - return header, blockData, nil -} - func (m *Manager) execApplyBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) (types.State, error) { rawTxs := make([][]byte, len(data.Txs)) for i := range data.Txs { From 4949f09aeecb2fac627259f1e1f7bfcb5d879f81 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 17:43:51 +0200 Subject: [PATCH 07/21] reset block timer when no txs are present --- block/aggregation.go | 7 ++++--- block/manager.go | 17 ++++++++++------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index c26a3d0503..6a56a7fca3 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -59,12 +59,13 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) case <-blockTimer.C: - m.logger.Debug("Block timer triggered block production") if m.txsAvailable { m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + m.txsAvailable = false + } else { + // Ensure we keep ticking even when there are no txs + blockTimer.Reset(m.config.Node.BlockTime.Duration) } - m.txsAvailable = false - case <-m.txNotifyCh: m.txsAvailable = true } diff --git a/block/manager.go b/block/manager.go index 9e5b33a0ce..1e66365421 100644 --- a/block/manager.go +++ b/block/manager.go @@ -470,20 +470,23 @@ func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { // Even if there are no transactions, return the batch with timestamp // This allows empty blocks to maintain proper timing if len(res.Batch.Transactions) == 0 { + // Even if there are no transactions, update lastBatchData so we don’t + // repeatedly emit the same empty batch, and persist it to metadata. + _ = m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)) + m.lastBatchData = res.BatchData return &BatchData{ Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData, }, ErrNoBatch } - h := convertBatchDataToBytes(res.BatchData) - if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { - m.logger.Error("error while setting last batch hash", "error", err) - } - m.lastBatchData = res.BatchData - return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, nil } - return nil, ErrNoBatch + h := convertBatchDataToBytes(res.BatchData) + if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { + m.logger.Error("error while setting last batch hash", "error", err) + } + m.lastBatchData = res.BatchData + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, nil } func (m *Manager) isUsingExpectedSingleSequencer(header *types.SignedHeader) bool { From 95897a4c3131f7b42ceb8ef4fe9b038b51350ebb Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 2 May 2025 17:36:16 +0100 Subject: [PATCH 08/21] Simplify --- block/manager.go | 76 +++++++++------------- specs/lazy-adr/adr-021-lazy-aggregation.md | 72 ++++++++++---------- 2 files changed, 68 insertions(+), 80 deletions(-) diff --git a/block/manager.go b/block/manager.go index 1e66365421..8dda3a8adb 100644 --- a/block/manager.go +++ b/block/manager.go @@ -467,26 +467,21 @@ func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { "txCount", len(res.Batch.Transactions), "timestamp", res.Timestamp) + var errRetrieveBatch error // Even if there are no transactions, return the batch with timestamp // This allows empty blocks to maintain proper timing if len(res.Batch.Transactions) == 0 { - // Even if there are no transactions, update lastBatchData so we don’t - // repeatedly emit the same empty batch, and persist it to metadata. - _ = m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)) - m.lastBatchData = res.BatchData - return &BatchData{ - Batch: res.Batch, - Time: res.Timestamp, - Data: res.BatchData, - }, ErrNoBatch + errRetrieveBatch = ErrNoBatch } + // Even if there are no transactions, update lastBatchData so we don’t + // repeatedly emit the same empty batch, and persist it to metadata. + if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { + m.logger.Error("error while setting last batch hash", "error", err) + } + m.lastBatchData = res.BatchData + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch } - h := convertBatchDataToBytes(res.BatchData) - if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { - m.logger.Error("error while setting last batch hash", "error", err) - } - m.lastBatchData = res.BatchData - return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, nil + return nil, ErrNoBatch } func (m *Manager) isUsingExpectedSingleSequencer(header *types.SignedHeader) bool { @@ -517,7 +512,7 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { height, err := m.store.Height(ctx) if err != nil { - return fmt.Errorf("error while getting height: %w", err) + return fmt.Errorf("error while getting store height: %w", err) } newHeight := height + 1 @@ -554,45 +549,31 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { data = pendingData } else { batchData, err := m.retrieveBatch(ctx) - if errors.Is(err, ErrNoBatch) { - // Even with no transactions, we still want to create a block with an empty batch - if batchData != nil { - m.logger.Info("Creating empty block", "height", newHeight) - - // Create an empty block - header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) - if err != nil { - return err - } - - err = m.store.SaveBlockData(ctx, header, data, &signature) - if err != nil { - return SaveBlockError{err} + if err != nil { + if errors.Is(err, ErrNoBatch) { + if batchData == nil { + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil } + m.logger.Info("Creating empty block", "height", newHeight) } else { - // If we don't have a batch at all (not even an empty one), skip block production - m.logger.Info("No batch retrieved from sequencer, skipping block production") - return nil + return fmt.Errorf("failed to get transactions from batch: %w", err) } - } else if err != nil { - return fmt.Errorf("failed to get transactions from batch: %w", err) } else { - // We have a batch with transactions - // sanity check timestamp for monotonically increasing if batchData.Before(lastHeaderTime) { return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) } m.logger.Info("Creating and publishing block", "height", newHeight) - header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) - if err != nil { - return err - } - m.logger.Debug("block info", "num_tx", len(data.Txs)) + m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) + } - err = m.store.SaveBlockData(ctx, header, data, &signature) - if err != nil { - return SaveBlockError{err} - } + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { + return SaveBlockError{err} } } @@ -712,7 +693,10 @@ func (m *Manager) getLastBlockTime() time.Time { func (m *Manager) createBlock(ctx context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { m.lastStateMtx.RLock() defer m.lastStateMtx.RUnlock() + return m.execCreateBlock(ctx, height, lastSignature, lastHeaderHash, m.lastState, batchData) +} +func (m *Manager) execCreateBlock(ctx context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, lastState types.State, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { if m.signer == nil { return nil, nil, fmt.Errorf("signer is nil; cannot create block") } diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index b41fc21feb..f9d428a065 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -34,23 +34,21 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai "txCount", len(res.Batch.Transactions), "timestamp", res.Timestamp) + var errRetrieveBatch error // Even if there are no transactions, return the batch with timestamp // This allows empty blocks to maintain proper timing if len(res.Batch.Transactions) == 0 { - return &BatchData{ - Batch: res.Batch, - Time: res.Timestamp, - Data: res.BatchData, - }, ErrNoBatch + errRetrieveBatch = ErrNoBatch } - h := convertBatchDataToBytes(res.BatchData) - if err := m.store.SetMetadata(ctx, LastBatchDataKey, h); err != nil { + // Even if there are no transactions, update lastBatchData so we don’t + // repeatedly emit the same empty batch, and persist it to metadata. + if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { m.logger.Error("error while setting last batch hash", "error", err) } m.lastBatchData = res.BatchData - return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, nil - } - return nil, ErrNoBatch + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch + } + return nil, ErrNoBatch } ``` @@ -61,27 +59,32 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai ```go // In publishBlock method batchData, err := m.retrieveBatch(ctx) - if errors.Is(err, ErrNoBatch) { - // Even with no transactions, we still want to create a block with an empty batch - if batchData != nil { - m.logger.Info("Creating empty block", "height", newHeight) - - // For empty blocks, use dataHashForEmptyTxs to indicate empty batch - header, data, err = m.createEmptyBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) - if err != nil { - return err - } - - err = m.store.SaveBlockData(ctx, header, data, &signature) - if err != nil { - return SaveBlockError{err} - } - } else { - // If we don't have a batch at all (not even an empty one), skip block production - m.logger.Info("No batch retrieved from sequencer, skipping block production") - return nil - } - } + if err != nil { + if errors.Is(err, ErrNoBatch) { + if batchData == nil { + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } + m.logger.Info("Creating empty block", "height", newHeight) + } else { + return fmt.Errorf("failed to get transactions from batch: %w", err) + } + } else { + if batchData.Before(lastHeaderTime) { + return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) + } + m.logger.Info("Creating and publishing block", "height", newHeight) + m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) + } + + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { + return SaveBlockError{err} + } ``` 3. **Lazy Aggregation Loop**: @@ -104,12 +107,13 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) case <-blockTimer.C: - m.logger.Debug("Block timer triggered block production") if m.txsAvailable { m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + m.txsAvailable = false + } else { + // Ensure we keep ticking even when there are no txs + blockTimer.Reset(m.config.Node.BlockTime.Duration) } - m.txsAvailable = false - case <-m.txNotifyCh: m.txsAvailable = true } From 0ea76571d7d7e3b83549f29acaa96fcc4cf2d6b1 Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 2 May 2025 17:44:09 +0100 Subject: [PATCH 09/21] Fix test --- block/publish_block_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/block/publish_block_test.go b/block/publish_block_test.go index 47608f020c..a52f50b89b 100644 --- a/block/publish_block_test.go +++ b/block/publish_block_test.go @@ -294,6 +294,9 @@ func Test_publishBlock_EmptyBatch(t *testing.T) { }) mockSeq.On("GetNextBatch", ctx, batchReqMatcher).Return(emptyBatchResponse, nil).Once() + // Mock SetMetadata for LastBatchDataKey (required for empty batch handling) + mockStore.On("SetMetadata", ctx, "l", mock.AnythingOfType("[]uint8")).Return(nil).Once() + // With our new implementation, we should expect SaveBlockData to be called for empty blocks mockStore.On("SaveBlockData", ctx, mock.AnythingOfType("*types.SignedHeader"), mock.AnythingOfType("*types.Data"), mock.AnythingOfType("*types.Signature")).Return(nil).Once() From 4d4d9d27fb592fdaf8213cd33ad3cb526b8576c4 Mon Sep 17 00:00:00 2001 From: Manav Aggarwal Date: Fri, 2 May 2025 17:55:49 +0100 Subject: [PATCH 10/21] Add reaper logic in adr --- specs/lazy-adr/adr-021-lazy-aggregation.md | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index f9d428a065..808b65f0f8 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -40,7 +40,7 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai if len(res.Batch.Transactions) == 0 { errRetrieveBatch = ErrNoBatch } - // Even if there are no transactions, update lastBatchData so we don’t + // Even if there are no transactions, update lastBatchData so we don't // repeatedly emit the same empty batch, and persist it to metadata. if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { m.logger.Error("error while setting last batch hash", "error", err) @@ -89,9 +89,25 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 3. **Lazy Aggregation Loop**: - A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. This approach provides deterministic block production while optimizing for transaction inclusion latency. + A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. ```go + // In Reaper.SubmitTxs + if r.manager != nil && len(newTxs) > 0 { + r.logger.Debug("Notifying manager of new transactions") + r.manager.NotifyNewTransactions() // Signals txNotifyCh + } + + // In Manager.NotifyNewTransactions + func (m *Manager) NotifyNewTransactions() { + select { + case m.txNotifyCh <- struct{}{}: + // Successfully sent notification + default: + // Channel buffer is full, notification already pending + } + } + // Modiified lazyAggregationLoop func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { // lazyTimer triggers block publication even during inactivity lazyTimer := time.NewTimer(0) From 86c62bd280eb50cadbc385b6cdc2af2007e35ed0 Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 2 May 2025 19:52:32 +0200 Subject: [PATCH 11/21] Update adr-021-lazy-aggregation.md Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com> --- specs/lazy-adr/adr-021-lazy-aggregation.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index 808b65f0f8..40543440cb 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -107,7 +107,7 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai // Channel buffer is full, notification already pending } } - // Modiified lazyAggregationLoop + // Modified lazyAggregationLoop func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { // lazyTimer triggers block publication even during inactivity lazyTimer := time.NewTimer(0) From fddf780fb8866178ff3d9f07689a7c7bd35b655c Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 20:55:53 +0200 Subject: [PATCH 12/21] move normal aggregation to aggration file --- block/aggregation.go | 63 ++++++++++++++++++++++++++++++++++ block/lazy_aggregation_test.go | 59 ------------------------------- 2 files changed, 63 insertions(+), 59 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 6a56a7fca3..032cc85b0e 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -2,7 +2,11 @@ package block import ( "context" + "sync" + "testing" "time" + + "github.com/stretchr/testify/require" ) // AggregationLoop is responsible for aggregating transactions into rollup-blocks. @@ -123,3 +127,62 @@ func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { return time.Millisecond } + +// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode +func TestNormalAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, 0) + m.config.Node.LazyMode = false + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.normalAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first block to be published by the timer + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer") + } + + // Reset the publish mock to track new calls + pubMock.reset() + + // Send a transaction notification + m.NotifyNewTransactions() + + // In normal mode, the notification should not trigger an immediate block + select { + case <-pubMock.calls: + // If we enable the optional enhancement to reset the timer, this might happen + // But with the current implementation, this should not happen + require.Fail("Block was published immediately after notification in normal mode") + case <-time.After(blockTime / 2): + // This is expected - no immediate block + } + + // Wait for the next regular block + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer after notification") + } + + cancel() + wg.Wait() +} diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go index 3c2ff77985..38f9783b1f 100644 --- a/block/lazy_aggregation_test.go +++ b/block/lazy_aggregation_test.go @@ -342,62 +342,3 @@ func TestEmptyBlockCreation(t *testing.T) { require.NotNil(capturedCtx, "Context should have been captured by mock publish function") require.Equal(ctx, capturedCtx, "Context should match the one passed to produceBlock") } - -// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode -func TestNormalAggregationLoop_TxNotification(t *testing.T) { - require := require.New(t) - - blockTime := 100 * time.Millisecond - m, pubMock := setupTestManager(t, blockTime, 0) - m.config.Node.LazyMode = false - - // Create the notification channel - m.txNotifyCh = make(chan struct{}, 1) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - blockTimer := time.NewTimer(blockTime) - defer blockTimer.Stop() - m.normalAggregationLoop(ctx, blockTimer) - }() - - // Wait for the first block to be published by the timer - select { - case <-pubMock.calls: - // Block was published by timer, which is expected - case <-time.After(blockTime * 2): - require.Fail("Block was not published by timer") - } - - // Reset the publish mock to track new calls - pubMock.reset() - - // Send a transaction notification - m.NotifyNewTransactions() - - // In normal mode, the notification should not trigger an immediate block - select { - case <-pubMock.calls: - // If we enable the optional enhancement to reset the timer, this might happen - // But with the current implementation, this should not happen - require.Fail("Block was published immediately after notification in normal mode") - case <-time.After(blockTime / 2): - // This is expected - no immediate block - } - - // Wait for the next regular block - select { - case <-pubMock.calls: - // Block was published by timer, which is expected - case <-time.After(blockTime * 2): - require.Fail("Block was not published by timer after notification") - } - - cancel() - wg.Wait() -} From bb80260f054303e67885431baf1673f0e01b3126 Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 2 May 2025 21:02:09 +0200 Subject: [PATCH 13/21] Update pkg/config/config_test.go Co-authored-by: graphite-app[bot] <96075541+graphite-app[bot]@users.noreply.github.com> --- pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index dbd870c890..7fd6c0704f 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -254,7 +254,7 @@ signer: // Compare the results - they should be identical require.Equal(t, cfgFromLoad.RootDir, cfgFromViper.RootDir, "RootDir should match") require.Equal(t, cfgFromLoad.DA.GasPrice, cfgFromViper.DA.GasPrice, "DA.GasPrice should match") - require.Equal(t, cfgFromLoad.Node.LazyMode, cfgFromViper.Node.LazyMode, "Node.LazyAggregator should match") + require.Equal(t, cfgFromLoad.Node.LazyMode, cfgFromViper.Node.LazyMode, "Node.LazyMode should match") require.Equal(t, cfgFromLoad.Node.Aggregator, cfgFromViper.Node.Aggregator, "Node.Aggregator should match") require.Equal(t, cfgFromLoad.Node.BlockTime, cfgFromViper.Node.BlockTime, "Node.BlockTime should match") require.Equal(t, cfgFromLoad.DA.Address, cfgFromViper.DA.Address, "DA.Address should match") From bed358cbb785349155d95e9295c7a8fc1a19e0ea Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 21:06:00 +0200 Subject: [PATCH 14/21] revert move --- block/aggregation.go | 63 ---------------------------------- block/lazy_aggregation_test.go | 59 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 63 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 032cc85b0e..6a56a7fca3 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -2,11 +2,7 @@ package block import ( "context" - "sync" - "testing" "time" - - "github.com/stretchr/testify/require" ) // AggregationLoop is responsible for aggregating transactions into rollup-blocks. @@ -127,62 +123,3 @@ func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { return time.Millisecond } - -// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode -func TestNormalAggregationLoop_TxNotification(t *testing.T) { - require := require.New(t) - - blockTime := 100 * time.Millisecond - m, pubMock := setupTestManager(t, blockTime, 0) - m.config.Node.LazyMode = false - - // Create the notification channel - m.txNotifyCh = make(chan struct{}, 1) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - blockTimer := time.NewTimer(blockTime) - defer blockTimer.Stop() - m.normalAggregationLoop(ctx, blockTimer) - }() - - // Wait for the first block to be published by the timer - select { - case <-pubMock.calls: - // Block was published by timer, which is expected - case <-time.After(blockTime * 2): - require.Fail("Block was not published by timer") - } - - // Reset the publish mock to track new calls - pubMock.reset() - - // Send a transaction notification - m.NotifyNewTransactions() - - // In normal mode, the notification should not trigger an immediate block - select { - case <-pubMock.calls: - // If we enable the optional enhancement to reset the timer, this might happen - // But with the current implementation, this should not happen - require.Fail("Block was published immediately after notification in normal mode") - case <-time.After(blockTime / 2): - // This is expected - no immediate block - } - - // Wait for the next regular block - select { - case <-pubMock.calls: - // Block was published by timer, which is expected - case <-time.After(blockTime * 2): - require.Fail("Block was not published by timer after notification") - } - - cancel() - wg.Wait() -} diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go index 38f9783b1f..3c2ff77985 100644 --- a/block/lazy_aggregation_test.go +++ b/block/lazy_aggregation_test.go @@ -342,3 +342,62 @@ func TestEmptyBlockCreation(t *testing.T) { require.NotNil(capturedCtx, "Context should have been captured by mock publish function") require.Equal(ctx, capturedCtx, "Context should match the one passed to produceBlock") } + +// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode +func TestNormalAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, 0) + m.config.Node.LazyMode = false + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.normalAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first block to be published by the timer + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer") + } + + // Reset the publish mock to track new calls + pubMock.reset() + + // Send a transaction notification + m.NotifyNewTransactions() + + // In normal mode, the notification should not trigger an immediate block + select { + case <-pubMock.calls: + // If we enable the optional enhancement to reset the timer, this might happen + // But with the current implementation, this should not happen + require.Fail("Block was published immediately after notification in normal mode") + case <-time.After(blockTime / 2): + // This is expected - no immediate block + } + + // Wait for the next regular block + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer after notification") + } + + cancel() + wg.Wait() +} From 759994590bf4f3106f8cea68c72676f6363417aa Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 21:12:31 +0200 Subject: [PATCH 15/21] adjust docs --- specs/src/specs/block-manager.md | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/specs/src/specs/block-manager.md b/specs/src/specs/block-manager.md index b60b51e27b..ab498b2653 100644 --- a/specs/src/specs/block-manager.md +++ b/specs/src/specs/block-manager.md @@ -63,7 +63,8 @@ Block manager configuration options: |BlockTime|time.Duration|time interval used for block production and block retrieval from block store ([`defaultBlockTime`][defaultBlockTime])| |DABlockTime|time.Duration|time interval used for both block publication to DA network and block retrieval from DA network ([`defaultDABlockTime`][defaultDABlockTime])| |DAStartHeight|uint64|block retrieval from DA network starts from this height| -|LazyBlockTime|time.Duration|time interval used for block production in lazy aggregator mode even when there are no transactions ([`defaultLazyBlockTime`][defaultLazyBlockTime])| +|LazyBlockInterval|time.Duration|time interval used for block production in lazy aggregator mode even when there are no transactions ([`defaultLazyBlockTime`][defaultLazyBlockTime])| +|LazyMode|bool|when set to true, enables lazy aggregation mode which produces blocks only when transactions are available or at LazyBlockInterval intervals| ### Block Production @@ -71,7 +72,12 @@ When the full node is operating as a sequencer (aka aggregator), the block manag In `normal` mode, the block manager runs a timer, which is set to the `BlockTime` configuration parameter, and continuously produces blocks at `BlockTime` intervals. -In `lazy` mode, the block manager starts building a block when any transaction becomes available in the mempool. After the first notification of the transaction availability, the manager will wait for a 1 second timer to finish, in order to collect as many transactions from the mempool as possible. The 1 second delay is chosen in accordance with the default block time of 1s. The block manager also notifies the full node after every lazy block building. +In `lazy` mode, the block manager implements a dual timer mechanism: + +1. A `blockTimer` that triggers block production at regular intervals when transactions are available +2. A `lazyTimer` that ensures blocks are produced at `LazyBlockInterval` intervals even during periods of inactivity + +The block manager starts building a block when any transaction becomes available in the mempool via a notification channel (`txNotifyCh`). When the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. The block manager also produces empty blocks at regular intervals to maintain consistency with the DA layer, ensuring a 1:1 mapping between DA layer blocks and execution layer blocks. #### Building the Block @@ -154,6 +160,12 @@ The communication between the full node and block manager: * The block manager loads the initial state from the local store and uses genesis if not found in the local store, when the node (re)starts. * The default mode for sequencer nodes is normal (not lazy). * The sequencer can produce empty blocks. +* In lazy aggregation mode, the block manager maintains consistency with the DA layer by producing empty blocks at regular intervals, ensuring a 1:1 mapping between DA layer blocks and execution layer blocks. +* The lazy aggregation mechanism uses a dual timer approach: + * A `blockTimer` that triggers block production when transactions are available + * A `lazyTimer` that ensures blocks are produced even during periods of inactivity +* Empty batches are handled differently in lazy mode - instead of discarding them, they are returned with the `ErrNoBatch` error, allowing the caller to create empty blocks with proper timestamps. +* Transaction notifications from the `Reaper` to the `Manager` are handled via a non-blocking notification channel (`txNotifyCh`) to prevent backpressure. * The block manager uses persistent storage (disk) when the `root_dir` and `db_path` configuration parameters are specified in `config.yaml` file under the app directory. If these configuration parameters are not specified, the in-memory storage is used, which will not be persistent if the node stops. * The block manager does not re-apply the block again (in other words, create a new updated state and persist it) when a block was initially applied using P2P block sync, but later was DA included during DA retrieval. The block is only marked DA included in this case. * The data sync store is created by prefixing `dataSync` on the main data store. @@ -187,6 +199,8 @@ See [tutorial] for running a multi-node network with both sequencer and non-sequ [8] [Data Availability](./da.md) +[9] [Lazy Aggregation with DA Layer Consistency ADR](../../lazy-adr/adr-021-lazy-aggregation.md) + [maxSubmitAttempts]: https://github.com/rollkit/rollkit/blob/main/block/manager.go#L50 [defaultBlockTime]: https://github.com/rollkit/rollkit/blob/main/block/manager.go#L36 [defaultDABlockTime]: https://github.com/rollkit/rollkit/blob/main/block/manager.go#L33 From 92a9237e84e4f9aeb3faa82d966625c4b7662cbc Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 5 May 2025 12:11:25 +0200 Subject: [PATCH 16/21] fix tests --- rollups/testapp/cmd/init_test.go | 2 +- rollups/testapp/kv/kvexecutor_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rollups/testapp/cmd/init_test.go b/rollups/testapp/cmd/init_test.go index eb9aae6f75..a002e6f574 100644 --- a/rollups/testapp/cmd/init_test.go +++ b/rollups/testapp/cmd/init_test.go @@ -71,7 +71,7 @@ func TestInitCommand(t *testing.T) { require.Contains(t, yamlContent, "da:") require.Contains(t, yamlContent, "block_time: ") require.Contains(t, yamlContent, "15s") - require.Contains(t, yamlContent, "lazy_block_time: ") + require.Contains(t, yamlContent, "lazy_block_interval: ") require.Contains(t, yamlContent, "1m0s") // Verify addresses diff --git a/rollups/testapp/kv/kvexecutor_test.go b/rollups/testapp/kv/kvexecutor_test.go index a6fd24d1d6..774c8b7735 100644 --- a/rollups/testapp/kv/kvexecutor_test.go +++ b/rollups/testapp/kv/kvexecutor_test.go @@ -1,6 +1,7 @@ package executor import ( + "bytes" "context" "reflect" "strings" @@ -32,7 +33,7 @@ func TestInitChain_Idempotency(t *testing.T) { if err != nil { t.Fatalf("InitChain failed on second call: %v", err) } - if !reflect.DeepEqual(stateRoot1, stateRoot2) { + if !bytes.Equal(stateRoot1, stateRoot2) { t.Errorf("Genesis state roots do not match: %s vs %s", stateRoot1, stateRoot2) } if maxBytes2 != 1024 { From 43f69368deb63ceae98b6b6c74c9c7b8c819b5f3 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 5 May 2025 12:12:56 +0200 Subject: [PATCH 17/21] lint --- block/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/manager.go b/block/manager.go index 6a6e139d77..5fcc6ee0bc 100644 --- a/block/manager.go +++ b/block/manager.go @@ -546,7 +546,7 @@ func (m *Manager) publishBlockInternal(ctx context.Context) error { return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) } m.logger.Info("Creating and publishing block", "height", newHeight) - m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) + m.logger.Debug("block info", "num_tx", len(batchData.Transactions)) } header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) From f4d4f7f7a15e8b699997ce82c45db3e59a914855 Mon Sep 17 00:00:00 2001 From: Marko Date: Mon, 5 May 2025 13:48:53 +0200 Subject: [PATCH 18/21] Update block/aggregation.go --- block/aggregation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/aggregation.go b/block/aggregation.go index 6a56a7fca3..6ae0437108 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -73,7 +73,7 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time } // produceBlock handles the common logic for producing a block and resetting timers -func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { +func (m *Manager) produceBlock(ctx context.Context, mode string, lazyTimer, blockTimer *time.Timer) { // Record the start time start := time.Now() From 802588e9501eda62cba1938145376531b3381e93 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 5 May 2025 13:49:24 +0200 Subject: [PATCH 19/21] amend trigger to mode --- block/aggregation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 6ae0437108..29e2dd58d8 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -79,9 +79,9 @@ func (m *Manager) produceBlock(ctx context.Context, mode string, lazyTimer, bloc // Attempt to publish the block if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { - m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + m.logger.Error("error while publishing block", "mode", mode, "error", err) } else { - m.logger.Debug("Successfully published block", "trigger", trigger) + m.logger.Debug("Successfully published block", "mode", mode) } // Reset both timers for the next aggregation window From b3e1d8307d12cf023c5b05737269dcd28ed5b8aa Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 5 May 2025 14:23:51 +0200 Subject: [PATCH 20/21] reduce default da block time --- pkg/config/defaults.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 7e00021b0b..6b5ce0474b 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -54,7 +54,7 @@ var DefaultConfig = Config{ }, DA: DAConfig{ Address: "http://localhost:7980", - BlockTime: DurationWrapper{15 * time.Second}, + BlockTime: DurationWrapper{6 * time.Second}, GasPrice: -1, GasMultiplier: 0, }, From 4d0c8be52d7c68a56b8dfc49bb096ca5c16dc1a5 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 5 May 2025 14:39:05 +0200 Subject: [PATCH 21/21] fix test --- pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7fd6c0704f..da44d7fd7a 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -27,7 +27,7 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, "", def.DA.SubmitOptions) assert.Equal(t, "", def.DA.Namespace) assert.Equal(t, 1*time.Second, def.Node.BlockTime.Duration) - assert.Equal(t, 15*time.Second, def.DA.BlockTime.Duration) + assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration) assert.Equal(t, uint64(0), def.DA.StartHeight) assert.Equal(t, uint64(0), def.DA.MempoolTTL) assert.Equal(t, uint64(0), def.Node.MaxPendingBlocks)