From b5841395ba9d331378ddf7eb2f22717f1f7fc802 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 14:27:19 +0200 Subject: [PATCH 1/7] add a proposal for lazy --- block/aggregation.go | 97 ++++++++++++++++++++++--- block/lazy_aggregation_test.go | 127 +++++++++++++++++++++++++++++++++ block/manager.go | 73 +++++++++++++------ block/reaper.go | 22 ++++-- node/full.go | 23 +++--- 5 files changed, 296 insertions(+), 46 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 51507ebfdf..f2df8acb68 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -2,6 +2,7 @@ package block import ( "context" + "errors" "time" ) @@ -49,30 +50,66 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time lazyTimer := time.NewTimer(0) defer lazyTimer.Stop() + // Initialize the throttle timer but don't start it yet + m.txNotifyThrottle = time.NewTimer(m.minTxNotifyInterval) + if !m.txNotifyThrottle.Stop() { + <-m.txNotifyThrottle.C // Drain the channel if it already fired + } + defer m.txNotifyThrottle.Stop() + + // Add a polling timer to periodically check for transactions + // This is a fallback mechanism in case notifications fail + txPollTimer := time.NewTimer(2 * time.Second) + defer txPollTimer.Stop() + for { select { case <-ctx.Done(): return case <-lazyTimer.C: + m.logger.Debug("Lazy timer triggered block production") + m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) + case <-blockTimer.C: - } + m.logger.Debug("Block timer triggered block production") + // m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + + case <-m.txNotifyCh: + // Only proceed if we're not being throttled + if time.Since(m.lastTxNotifyTime) < m.minTxNotifyInterval { + m.logger.Debug("Transaction notification throttled") + continue + } - // Reset the start time - start := time.Now() + m.logger.Debug("Transaction notification triggered block production") + m.produceBlock(ctx, "tx_notification", lazyTimer, blockTimer) - // Attempt to publish the block regardless of activity - if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { - m.logger.Error("error while publishing block", "error", err) - } + // Update the last notification time + m.lastTxNotifyTime = time.Now() - // Reset both timers for the next aggregation window - lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) - blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + case <-txPollTimer.C: + // Check if there are transactions available + hasTxs, err := m.checkForTransactions(ctx) + if err != nil { + m.logger.Error("Failed to check for transactions", "error", err) + } else if hasTxs { + m.logger.Debug("Transaction poll detected transactions") + m.produceBlock(ctx, "tx_poll", lazyTimer, blockTimer) + } + txPollTimer.Reset(2 * time.Second) + } } } func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Timer) { + // Initialize the throttle timer but don't start it yet + m.txNotifyThrottle = time.NewTimer(m.minTxNotifyInterval) + if !m.txNotifyThrottle.Stop() { + <-m.txNotifyThrottle.C // Drain the channel if it already fired + } + defer m.txNotifyThrottle.Stop() + for { select { case <-ctx.Done(): @@ -87,10 +124,30 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti // Reset the blockTimer to signal the next block production // period based on the block time. blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + + case <-m.txNotifyCh: + // clear notification channel } } } +// produceBlock handles the common logic for producing a block and resetting timers +func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() + + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + } else { + m.logger.Debug("Successfully published block", "trigger", trigger) + } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) +} + func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { elapsed := time.Since(start) @@ -100,3 +157,23 @@ func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { return time.Millisecond } + +// checkForTransactions checks if there are any transactions available for processing +func (m *Manager) checkForTransactions(ctx context.Context) (bool, error) { + // Try to retrieve a batch without actually consuming it + batchData, err := m.retrieveBatch(ctx) + if err != nil { + if errors.Is(err, ErrNoBatch) { + // This is expected when there are no transactions + return false, nil + } + return false, err + } + + // If we got a batch with transactions, return true + if batchData != nil && batchData.Batch != nil && len(batchData.Batch.Transactions) > 0 { + return true, nil + } + + return false, nil +} diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go index d6b2dce48b..1e34726c98 100644 --- a/block/lazy_aggregation_test.go +++ b/block/lazy_aggregation_test.go @@ -226,3 +226,130 @@ func TestGetRemainingSleep(t *testing.T) { sleep4 := getRemainingSleep(start4, interval) assert.InDelta(interval, sleep4, float64(5*time.Millisecond), "Case 4 failed") } + +// TestLazyAggregationLoop_TxNotification tests that transaction notifications trigger block production in lazy mode +func TestLazyAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 200 * time.Millisecond + lazyTime := 500 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) + m.config.Node.LazyMode = true + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + m.minTxNotifyInterval = 50 * time.Millisecond + m.lastTxNotifyTime = time.Time{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) + }() + + // Wait a bit to ensure the loop is running + time.Sleep(20 * time.Millisecond) + + // Send a transaction notification + m.NotifyNewTransactions() + + // Wait for the block to be published + select { + case <-pubMock.calls: + // Block was published, which is what we expect + case <-time.After(100 * time.Millisecond): + require.Fail("Block was not published after transaction notification") + } + + // Send another notification immediately - it should be throttled + m.NotifyNewTransactions() + + // No block should be published immediately due to throttling + select { + case <-pubMock.calls: + require.Fail("Block was published despite throttling") + case <-time.After(30 * time.Millisecond): + // This is expected - no immediate block + } + + // Wait for the throttle period to pass + time.Sleep(m.minTxNotifyInterval) + + // Now a block should be published + select { + case <-pubMock.calls: + // Block was published after throttle period + case <-time.After(100 * time.Millisecond): + require.Fail("Block was not published after throttle period") + } + + cancel() + wg.Wait() +} + +// TestNormalAggregationLoop_TxNotification tests that transaction notifications are handled in normal mode +func TestNormalAggregationLoop_TxNotification(t *testing.T) { + require := require.New(t) + + blockTime := 100 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, 0) + m.config.Node.LazyMode = false + + // Create the notification channel + m.txNotifyCh = make(chan struct{}, 1) + m.minTxNotifyInterval = 50 * time.Millisecond + m.lastTxNotifyTime = time.Time{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.normalAggregationLoop(ctx, blockTimer) + }() + + // Wait for the first block to be published by the timer + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer") + } + + // Reset the publish mock to track new calls + pubMock.reset() + + // Send a transaction notification + m.NotifyNewTransactions() + + // In normal mode, the notification should not trigger an immediate block + select { + case <-pubMock.calls: + // If we enable the optional enhancement to reset the timer, this might happen + // But with the current implementation, this should not happen + require.Fail("Block was published immediately after notification in normal mode") + case <-time.After(blockTime / 2): + // This is expected - no immediate block + } + + // Wait for the next regular block + select { + case <-pubMock.calls: + // Block was published by timer, which is expected + case <-time.After(blockTime * 2): + require.Fail("Block was not published by timer after notification") + } + + cancel() + wg.Wait() +} diff --git a/block/manager.go b/block/manager.go index f88969cbe4..65023600b3 100644 --- a/block/manager.go +++ b/block/manager.go @@ -152,6 +152,18 @@ type Manager struct { // publishBlock is the function used to publish blocks. It defaults to // the manager's publishBlock method but can be overridden for testing. publishBlock publishBlockFunc + + // txNotifyCh is used to signal when new transactions are available + txNotifyCh chan struct{} + + // txNotifyThrottle controls how frequently we can produce blocks in response to tx notifications + txNotifyThrottle *time.Timer + + // lastTxNotifyTime tracks when we last produced a block due to tx notification + lastTxNotifyTime time.Time + + // minTxNotifyInterval is the minimum time between tx-triggered block productions + minTxNotifyInterval time.Duration } // getInitialState tries to load lastState from Store, and if it's not available it reads genesis. @@ -323,6 +335,9 @@ func NewManager( daH := atomic.Uint64{} daH.Store(s.DAHeight) + // Default minimum interval between tx-triggered blocks (can be made configurable) + minTxNotifyInterval := 500 * time.Millisecond + agg := &Manager{ signer: signer, config: config, @@ -332,27 +347,30 @@ func NewManager( dalc: dalc, daHeight: &daH, // channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary - HeaderCh: make(chan *types.SignedHeader, channelLength), - DataCh: make(chan *types.Data, channelLength), - headerInCh: make(chan NewHeaderEvent, headerInChLength), - dataInCh: make(chan NewDataEvent, headerInChLength), - headerStoreCh: make(chan struct{}, 1), - dataStoreCh: make(chan struct{}, 1), - headerStore: headerStore, - dataStore: dataStore, - lastStateMtx: new(sync.RWMutex), - lastBatchData: lastBatchData, - headerCache: cache.NewCache[types.SignedHeader](), - dataCache: cache.NewCache[types.Data](), - retrieveCh: make(chan struct{}, 1), - logger: logger, - buildingBlock: false, - pendingHeaders: pendingHeaders, - metrics: seqMetrics, - sequencer: sequencer, - exec: exec, - gasPrice: gasPrice, - gasMultiplier: gasMultiplier, + HeaderCh: make(chan *types.SignedHeader, channelLength), + DataCh: make(chan *types.Data, channelLength), + headerInCh: make(chan NewHeaderEvent, headerInChLength), + dataInCh: make(chan NewDataEvent, headerInChLength), + headerStoreCh: make(chan struct{}, 1), + dataStoreCh: make(chan struct{}, 1), + headerStore: headerStore, + dataStore: dataStore, + lastStateMtx: new(sync.RWMutex), + lastBatchData: lastBatchData, + headerCache: cache.NewCache[types.SignedHeader](), + dataCache: cache.NewCache[types.Data](), + retrieveCh: make(chan struct{}, 1), + logger: logger, + buildingBlock: false, + pendingHeaders: pendingHeaders, + metrics: seqMetrics, + sequencer: sequencer, + exec: exec, + gasPrice: gasPrice, + gasMultiplier: gasMultiplier, + txNotifyCh: make(chan struct{}, 1), // Non-blocking channel + minTxNotifyInterval: minTxNotifyInterval, + lastTxNotifyTime: time.Time{}, // Zero time } agg.init(ctx) // Set the default publishBlock implementation @@ -942,3 +960,16 @@ func (m *Manager) getSignature(header types.Header) (types.Signature, error) { } return m.signer.Sign(b) } + +// NotifyNewTransactions signals that new transactions are available for processing +// This method will be called by the Reaper when it receives new transactions +func (m *Manager) NotifyNewTransactions() { + // Non-blocking send to avoid slowing down the transaction submission path + select { + case m.txNotifyCh <- struct{}{}: + // Successfully sent notification + default: + // Channel buffer is full, which means a notification is already pending + // This is fine, as we just need to trigger one block production + } +} diff --git a/block/reaper.go b/block/reaper.go index a517747bf0..8464d07ca8 100644 --- a/block/reaper.go +++ b/block/reaper.go @@ -27,6 +27,7 @@ type Reaper struct { logger log.Logger ctx context.Context seenStore ds.Batching + manager *Manager } // NewReaper creates a new Reaper instance with persistent seenTx storage. @@ -45,6 +46,11 @@ func NewReaper(ctx context.Context, exec coreexecutor.Executor, sequencer corese } } +// SetManager sets the Manager reference for transaction notifications +func (r *Reaper) SetManager(manager *Manager) { + r.manager = manager +} + // Start begins the reaping process at the specified interval. func (r *Reaper) Start(ctx context.Context) { r.ctx = ctx @@ -72,6 +78,11 @@ func (r *Reaper) SubmitTxs() { return } + if len(txs) == 0 { + r.logger.Debug("Reaper found no new txs to submit") + return + } + var newTxs [][]byte for _, tx := range txs { txHash := hashTx(tx) @@ -86,11 +97,6 @@ func (r *Reaper) SubmitTxs() { } } - if len(newTxs) == 0 { - r.logger.Debug("Reaper found no new txs to submit") - return - } - r.logger.Debug("Reaper submitting txs to sequencer", "txCount", len(newTxs)) _, err = r.sequencer.SubmitRollupBatchTxs(r.ctx, coresequencer.SubmitRollupBatchTxsRequest{ @@ -110,6 +116,12 @@ func (r *Reaper) SubmitTxs() { } } + // Notify the manager that new transactions are available + if r.manager != nil && len(newTxs) > 0 { + r.logger.Debug("Notifying manager of new transactions") + r.manager.NotifyNewTransactions() + } + r.logger.Debug("Reaper successfully submitted txs") } diff --git a/node/full.go b/node/full.go index 2609630f7f..98dd0f79a5 100644 --- a/node/full.go +++ b/node/full.go @@ -96,16 +96,6 @@ func newFullNode( store := store.New(mainKV) - reaper := block.NewReaper( - ctx, - exec, - sequencer, - genesis.ChainID, - nodeConfig.Node.BlockTime.Duration, - logger.With("module", "Reaper"), - mainKV, - ) - blockManager, err := initBlockManager( ctx, signer, @@ -126,6 +116,19 @@ func newFullNode( return nil, err } + reaper := block.NewReaper( + ctx, + exec, + sequencer, + genesis.ChainID, + nodeConfig.Node.BlockTime.Duration, + logger.With("module", "Reaper"), + mainKV, + ) + + // Connect the reaper to the manager for transaction notifications + reaper.SetManager(blockManager) + node := &FullNode{ genesis: genesis, nodeConfig: nodeConfig, From 64977427f6cd9e76c517d2fe7b617df1885b9810 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 15:02:25 +0200 Subject: [PATCH 2/7] linting --- block/aggregation.go | 9 ++++++--- block/lazy_aggregation_test.go | 10 ++++++++++ specs/lazy-adr/adr-007-header-commit-to-shares.md | 3 --- specs/lazy-adr/adr-021-lazy-aggregation.md | 9 +++++---- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index f2df8acb68..80c2c060ae 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -73,7 +73,9 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time case <-blockTimer.C: m.logger.Debug("Block timer triggered block production") - // m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) + // Block production is intentionally disabled for the blockTimer case in lazy mode. + // This is because lazy mode prioritizes transaction-driven or periodic triggers + // (e.g., lazyTimer) over time-based triggers to optimize resource usage. case <-m.txNotifyCh: // Only proceed if we're not being throttled @@ -126,7 +128,8 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) case <-m.txNotifyCh: - // clear notification channel + // Transaction notifications are intentionally ignored in normal mode + // to avoid triggering block production outside the scheduled intervals. } } } @@ -171,7 +174,7 @@ func (m *Manager) checkForTransactions(ctx context.Context) (bool, error) { } // If we got a batch with transactions, return true - if batchData != nil && batchData.Batch != nil && len(batchData.Batch.Transactions) > 0 { + if batchData != nil && batchData.Batch != nil && len(batchData.Transactions) > 0 { return true, nil } diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go index 1e34726c98..f6371ad522 100644 --- a/block/lazy_aggregation_test.go +++ b/block/lazy_aggregation_test.go @@ -22,6 +22,16 @@ type mockPublishBlock struct { delay time.Duration // Optional delay to simulate processing time } +// reset clears the calls channel in mockPublishBlock. +func (m *mockPublishBlock) reset() { + m.mu.Lock() + defer m.mu.Unlock() + // Clear the channel + for len(m.calls) > 0 { + <-m.calls + } +} + func (m *mockPublishBlock) publish(ctx context.Context) error { m.mu.Lock() err := m.err diff --git a/specs/lazy-adr/adr-007-header-commit-to-shares.md b/specs/lazy-adr/adr-007-header-commit-to-shares.md index 283fece0ae..acc5d08cde 100644 --- a/specs/lazy-adr/adr-007-header-commit-to-shares.md +++ b/specs/lazy-adr/adr-007-header-commit-to-shares.md @@ -56,13 +56,10 @@ Discarded ### Positive - ### Negative - ### Neutral - ## References 1. diff --git a/specs/lazy-adr/adr-021-lazy-aggregation.md b/specs/lazy-adr/adr-021-lazy-aggregation.md index ea4fd0f3ae..4ff0192bb2 100644 --- a/specs/lazy-adr/adr-021-lazy-aggregation.md +++ b/specs/lazy-adr/adr-021-lazy-aggregation.md @@ -20,11 +20,12 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 1. **Modified Batch Retrieval**: ```go + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { - res, err := m.sequencer.GetBatch(ctx) - if err != nil { - return nil, err - } + res, err := m.sequencer.GetBatch(ctx) + if err != nil { + return nil, err + } if res != nil && res.Batch != nil { // Even if there are no transactions, return the batch with timestamp From 38bf2065773302ffb158bb67eea6298fbf1bae57 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 15:51:25 +0200 Subject: [PATCH 3/7] minor cleanup --- block/aggregation.go | 53 +++++-------------- block/lazy_aggregation_test.go | 93 ++++++++-------------------------- 2 files changed, 34 insertions(+), 112 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 80c2c060ae..c5a95528b3 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -28,35 +28,28 @@ func (m *Manager) AggregationLoop(ctx context.Context) { time.Sleep(delay) } - // blockTimer is used to signal when to build a block based on the - // rollup block time. A timer is used so that the time to build a block - // can be taken into account. - blockTimer := time.NewTimer(0) - defer blockTimer.Stop() - // Lazy Aggregator mode. // In Lazy Aggregator mode, blocks are built only when there are // transactions or every LazyBlockTime. if m.config.Node.LazyMode { - m.lazyAggregationLoop(ctx, blockTimer) + m.lazyAggregationLoop(ctx) return } + // blockTimer is used to signal when to build a block based on the + // rollup block time. A timer is used so that the time to build a block + // can be taken into account. + blockTimer := time.NewTimer(0) + defer blockTimer.Stop() + m.normalAggregationLoop(ctx, blockTimer) } -func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { +func (m *Manager) lazyAggregationLoop(ctx context.Context) { // lazyTimer triggers block publication even during inactivity lazyTimer := time.NewTimer(0) defer lazyTimer.Stop() - // Initialize the throttle timer but don't start it yet - m.txNotifyThrottle = time.NewTimer(m.minTxNotifyInterval) - if !m.txNotifyThrottle.Stop() { - <-m.txNotifyThrottle.C // Drain the channel if it already fired - } - defer m.txNotifyThrottle.Stop() - // Add a polling timer to periodically check for transactions // This is a fallback mechanism in case notifications fail txPollTimer := time.NewTimer(2 * time.Second) @@ -68,24 +61,14 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time return case <-lazyTimer.C: - m.logger.Debug("Lazy timer triggered block production") - m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) - - case <-blockTimer.C: - m.logger.Debug("Block timer triggered block production") - // Block production is intentionally disabled for the blockTimer case in lazy mode. - // This is because lazy mode prioritizes transaction-driven or periodic triggers - // (e.g., lazyTimer) over time-based triggers to optimize resource usage. + m.produceBlockLazy(ctx, "lazy_timer", lazyTimer) case <-m.txNotifyCh: // Only proceed if we're not being throttled if time.Since(m.lastTxNotifyTime) < m.minTxNotifyInterval { - m.logger.Debug("Transaction notification throttled") continue } - - m.logger.Debug("Transaction notification triggered block production") - m.produceBlock(ctx, "tx_notification", lazyTimer, blockTimer) + m.produceBlockLazy(ctx, "tx_notification", lazyTimer) // Update the last notification time m.lastTxNotifyTime = time.Now() @@ -96,8 +79,7 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time if err != nil { m.logger.Error("Failed to check for transactions", "error", err) } else if hasTxs { - m.logger.Debug("Transaction poll detected transactions") - m.produceBlock(ctx, "tx_poll", lazyTimer, blockTimer) + m.produceBlockLazy(ctx, "tx_poll", lazyTimer) } txPollTimer.Reset(2 * time.Second) } @@ -105,12 +87,6 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time } func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Timer) { - // Initialize the throttle timer but don't start it yet - m.txNotifyThrottle = time.NewTimer(m.minTxNotifyInterval) - if !m.txNotifyThrottle.Stop() { - <-m.txNotifyThrottle.C // Drain the channel if it already fired - } - defer m.txNotifyThrottle.Stop() for { select { @@ -134,8 +110,8 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti } } -// produceBlock handles the common logic for producing a block and resetting timers -func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { +// produceBlockLazy handles the common logic for producing a block and resetting timers in lazy mode +func (m *Manager) produceBlockLazy(ctx context.Context, trigger string, lazyTimer *time.Timer) { // Record the start time start := time.Now() @@ -146,9 +122,8 @@ func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, b m.logger.Debug("Successfully published block", "trigger", trigger) } - // Reset both timers for the next aggregation window + // Reset the lazy timer for the next aggregation window lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) - blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) } func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go index f6371ad522..5b26a81c97 100644 --- a/block/lazy_aggregation_test.go +++ b/block/lazy_aggregation_test.go @@ -69,59 +69,13 @@ func setupTestManager(t *testing.T, blockTime, lazyTime time.Duration) (*Manager return m, pubMock } -// TestLazyAggregationLoop_BlockTimerTrigger tests that a block is published when the blockTimer fires first. -func TestLazyAggregationLoop_BlockTimerTrigger(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - blockTime := 50 * time.Millisecond - lazyTime := 200 * time.Millisecond // Lazy timer fires later - m, pubMock := setupTestManager(t, blockTime, lazyTime) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - // Use real timers for this test to simulate actual timing - blockTimer := time.NewTimer(0) // Fire immediately first time - defer blockTimer.Stop() - m.lazyAggregationLoop(ctx, blockTimer) - }() - - // Wait for the first publish call triggered by the initial immediate blockTimer fire - select { - case <-pubMock.calls: - // Good, first block published - case <-time.After(2 * blockTime): // Give some buffer - require.Fail("timed out waiting for first block publication") - } - - // Wait for the second publish call, triggered by blockTimer reset - select { - case <-pubMock.calls: - // Good, second block published by blockTimer - case <-time.After(2 * blockTime): // Give some buffer - require.Fail("timed out waiting for second block publication (blockTimer)") - } - - // Ensure lazyTimer didn't trigger a publish yet - assert.Len(pubMock.calls, 0, "Expected no more publish calls yet") - - cancel() - wg.Wait() -} - -// TestLazyAggregationLoop_LazyTimerTrigger tests that a block is published when the lazyTimer fires first. +// TestLazyAggregationLoop_LazyTimerTrigger tests that a block is published when the lazyTimer fires. func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { - assert := assert.New(t) + require := require.New(t) - blockTime := 200 * time.Millisecond // Block timer fires later lazyTime := 50 * time.Millisecond - m, pubMock := setupTestManager(t, blockTime, lazyTime) + m, pubMock := setupTestManager(t, 0, lazyTime) // blockTime doesn't matter in lazy mode ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -130,13 +84,10 @@ func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - // Use real timers for this test - blockTimer := time.NewTimer(0) // Fire immediately first time - defer blockTimer.Stop() - m.lazyAggregationLoop(ctx, blockTimer) + m.lazyAggregationLoop(ctx) }() - // Wait for the first publish call triggered by the initial immediate blockTimer fire + // Wait for the first publish call triggered by the initial lazy timer fire select { case <-pubMock.calls: // Good, first block published @@ -152,9 +103,6 @@ func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { require.Fail("timed out waiting for second block publication (lazyTimer)") } - // Ensure blockTimer didn't trigger a publish yet - assert.Len(pubMock.calls, 0, "Expected no more publish calls yet") - cancel() wg.Wait() } @@ -163,9 +111,8 @@ func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { func TestLazyAggregationLoop_PublishError(t *testing.T) { require := require.New(t) - blockTime := 50 * time.Millisecond - lazyTime := 100 * time.Millisecond - m, pubMock := setupTestManager(t, blockTime, lazyTime) + lazyTime := 50 * time.Millisecond + m, pubMock := setupTestManager(t, 0, lazyTime) // blockTime doesn't matter in lazy mode pubMock.mu.Lock() pubMock.err = errors.New("publish failed") @@ -178,16 +125,13 @@ func TestLazyAggregationLoop_PublishError(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - // Use real timers - blockTimer := time.NewTimer(0) - defer blockTimer.Stop() - m.lazyAggregationLoop(ctx, blockTimer) + m.lazyAggregationLoop(ctx) }() // Wait for the first publish attempt (which will fail) select { case <-pubMock.calls: - case <-time.After(2 * blockTime): + case <-time.After(2 * lazyTime): require.Fail("timed out waiting for first block publication attempt") } @@ -199,7 +143,7 @@ func TestLazyAggregationLoop_PublishError(t *testing.T) { // Wait for the second publish attempt (should succeed) select { case <-pubMock.calls: - case <-time.After(2 * blockTime): + case <-time.After(2 * lazyTime): require.Fail("timed out waiting for second block publication attempt after error") } @@ -241,9 +185,8 @@ func TestGetRemainingSleep(t *testing.T) { func TestLazyAggregationLoop_TxNotification(t *testing.T) { require := require.New(t) - blockTime := 200 * time.Millisecond lazyTime := 500 * time.Millisecond - m, pubMock := setupTestManager(t, blockTime, lazyTime) + m, pubMock := setupTestManager(t, 0, lazyTime) // blockTime doesn't matter in lazy mode m.config.Node.LazyMode = true // Create the notification channel @@ -258,14 +201,15 @@ func TestLazyAggregationLoop_TxNotification(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - blockTimer := time.NewTimer(blockTime) - defer blockTimer.Stop() - m.lazyAggregationLoop(ctx, blockTimer) + m.lazyAggregationLoop(ctx) }() // Wait a bit to ensure the loop is running time.Sleep(20 * time.Millisecond) + // Reset the mock to clear any calls from the initial lazy timer + pubMock.reset() + // Send a transaction notification m.NotifyNewTransactions() @@ -277,6 +221,9 @@ func TestLazyAggregationLoop_TxNotification(t *testing.T) { require.Fail("Block was not published after transaction notification") } + // Reset the mock again to ensure we're only tracking new calls + pubMock.reset() + // Send another notification immediately - it should be throttled m.NotifyNewTransactions() @@ -288,8 +235,8 @@ func TestLazyAggregationLoop_TxNotification(t *testing.T) { // This is expected - no immediate block } - // Wait for the throttle period to pass - time.Sleep(m.minTxNotifyInterval) + // Wait for slightly longer than the throttle period to ensure the timer fires + time.Sleep(m.minTxNotifyInterval + 20*time.Millisecond) // Now a block should be published select { From ddedc5168e1457b0f9b4cd6b6673eb3a67133810 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 16:20:17 +0200 Subject: [PATCH 4/7] simplify --- block/aggregation.go | 67 ++++++++++++++++++++++++---------- block/lazy_aggregation_test.go | 61 ++++++++++++++----------------- block/manager.go | 58 +++++++++++------------------ block/reaper_test.go | 17 +++------ 4 files changed, 102 insertions(+), 101 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index c5a95528b3..6562620123 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -28,24 +28,24 @@ func (m *Manager) AggregationLoop(ctx context.Context) { time.Sleep(delay) } + // blockTimer is used to signal when to build a block based on the + // rollup block time. A timer is used so that the time to build a block + // can be taken into account. + blockTimer := time.NewTimer(0) + defer blockTimer.Stop() + // Lazy Aggregator mode. // In Lazy Aggregator mode, blocks are built only when there are // transactions or every LazyBlockTime. if m.config.Node.LazyMode { - m.lazyAggregationLoop(ctx) + m.lazyAggregationLoop(ctx, blockTimer) return } - // blockTimer is used to signal when to build a block based on the - // rollup block time. A timer is used so that the time to build a block - // can be taken into account. - blockTimer := time.NewTimer(0) - defer blockTimer.Stop() - m.normalAggregationLoop(ctx, blockTimer) } -func (m *Manager) lazyAggregationLoop(ctx context.Context) { +func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { // lazyTimer triggers block publication even during inactivity lazyTimer := time.NewTimer(0) defer lazyTimer.Stop() @@ -55,31 +55,54 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context) { txPollTimer := time.NewTimer(2 * time.Second) defer txPollTimer.Stop() + // Track if we've received transactions but haven't built a block yet + pendingTxs := false + for { select { case <-ctx.Done(): return case <-lazyTimer.C: - m.produceBlockLazy(ctx, "lazy_timer", lazyTimer) + m.buildingBlock = false + pendingTxs = false + m.produceBlockLazy(ctx, "lazy_timer", lazyTimer, blockTimer) + + case <-blockTimer.C: + if pendingTxs { + m.buildingBlock = true + m.produceBlockLazy(ctx, "tx_collection_complete", lazyTimer, blockTimer) + pendingTxs = false + } else { + // Reset the block timer if no pending transactions + blockTimer.Reset(m.config.Node.BlockTime.Duration) + } case <-m.txNotifyCh: - // Only proceed if we're not being throttled - if time.Since(m.lastTxNotifyTime) < m.minTxNotifyInterval { + if m.buildingBlock { continue } - m.produceBlockLazy(ctx, "tx_notification", lazyTimer) - // Update the last notification time - m.lastTxNotifyTime = time.Now() + // Instead of immediately producing a block, mark that we have pending transactions + // and let the block timer determine when to actually build the block + if !pendingTxs { + pendingTxs = true + m.logger.Debug("Received transaction notification, waiting for more transactions", + "collection_time", m.config.Node.BlockTime.Duration) + // Reset the block timer to wait for more transactions + blockTimer.Reset(m.config.Node.BlockTime.Duration) + } case <-txPollTimer.C: - // Check if there are transactions available hasTxs, err := m.checkForTransactions(ctx) if err != nil { m.logger.Error("Failed to check for transactions", "error", err) - } else if hasTxs { - m.produceBlockLazy(ctx, "tx_poll", lazyTimer) + } else if hasTxs && !m.buildingBlock && !pendingTxs { + // Same as with txNotifyCh - mark pending and wait for block timer + pendingTxs = true + m.logger.Debug("Found transactions in poll, waiting for more transactions", + "collection_time", m.config.Node.BlockTime.Duration) + blockTimer.Reset(m.config.Node.BlockTime.Duration) } txPollTimer.Reset(2 * time.Second) } @@ -93,14 +116,12 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti case <-ctx.Done(): return case <-blockTimer.C: - // Define the start time for the block production period start := time.Now() if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { m.logger.Error("error while publishing block", "error", err) } // Reset the blockTimer to signal the next block production - // period based on the block time. blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) case <-m.txNotifyCh: @@ -111,10 +132,15 @@ func (m *Manager) normalAggregationLoop(ctx context.Context, blockTimer *time.Ti } // produceBlockLazy handles the common logic for producing a block and resetting timers in lazy mode -func (m *Manager) produceBlockLazy(ctx context.Context, trigger string, lazyTimer *time.Timer) { +func (m *Manager) produceBlockLazy(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { // Record the start time start := time.Now() + m.buildingBlock = true + defer func() { + m.buildingBlock = false + }() + // Attempt to publish the block if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { m.logger.Error("error while publishing block", "trigger", trigger, "error", err) @@ -124,6 +150,7 @@ func (m *Manager) produceBlockLazy(ctx context.Context, trigger string, lazyTime // Reset the lazy timer for the next aggregation window lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) } func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { diff --git a/block/lazy_aggregation_test.go b/block/lazy_aggregation_test.go index 5b26a81c97..85ba97da78 100644 --- a/block/lazy_aggregation_test.go +++ b/block/lazy_aggregation_test.go @@ -73,9 +73,9 @@ func setupTestManager(t *testing.T, blockTime, lazyTime time.Duration) (*Manager func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { require := require.New(t) - + blockTime := 20 * time.Millisecond lazyTime := 50 * time.Millisecond - m, pubMock := setupTestManager(t, 0, lazyTime) // blockTime doesn't matter in lazy mode + m, pubMock := setupTestManager(t, blockTime, lazyTime) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -84,7 +84,7 @@ func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - m.lazyAggregationLoop(ctx) + m.lazyAggregationLoop(ctx, time.NewTimer(blockTime)) }() // Wait for the first publish call triggered by the initial lazy timer fire @@ -111,8 +111,9 @@ func TestLazyAggregationLoop_LazyTimerTrigger(t *testing.T) { func TestLazyAggregationLoop_PublishError(t *testing.T) { require := require.New(t) + blockTime := 20 * time.Millisecond lazyTime := 50 * time.Millisecond - m, pubMock := setupTestManager(t, 0, lazyTime) // blockTime doesn't matter in lazy mode + m, pubMock := setupTestManager(t, blockTime, lazyTime) // blockTime doesn't matter in lazy mode pubMock.mu.Lock() pubMock.err = errors.New("publish failed") @@ -125,7 +126,7 @@ func TestLazyAggregationLoop_PublishError(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - m.lazyAggregationLoop(ctx) + m.lazyAggregationLoop(ctx, time.NewTimer(blockTime)) }() // Wait for the first publish attempt (which will fail) @@ -186,13 +187,12 @@ func TestLazyAggregationLoop_TxNotification(t *testing.T) { require := require.New(t) lazyTime := 500 * time.Millisecond - m, pubMock := setupTestManager(t, 0, lazyTime) // blockTime doesn't matter in lazy mode + blockTime := 50 * time.Millisecond + m, pubMock := setupTestManager(t, blockTime, lazyTime) m.config.Node.LazyMode = true // Create the notification channel m.txNotifyCh = make(chan struct{}, 1) - m.minTxNotifyInterval = 50 * time.Millisecond - m.lastTxNotifyTime = time.Time{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -201,7 +201,9 @@ func TestLazyAggregationLoop_TxNotification(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - m.lazyAggregationLoop(ctx) + blockTimer := time.NewTimer(blockTime) + defer blockTimer.Stop() + m.lazyAggregationLoop(ctx, blockTimer) }() // Wait a bit to ensure the loop is running @@ -213,37 +215,34 @@ func TestLazyAggregationLoop_TxNotification(t *testing.T) { // Send a transaction notification m.NotifyNewTransactions() - // Wait for the block to be published + // No block should be published immediately - we should wait for the block timer select { case <-pubMock.calls: - // Block was published, which is what we expect - case <-time.After(100 * time.Millisecond): - require.Fail("Block was not published after transaction notification") + require.Fail("Block was published immediately, but should wait for block timer") + case <-time.After(20 * time.Millisecond): + // This is expected - no immediate block } - // Reset the mock again to ensure we're only tracking new calls - pubMock.reset() - - // Send another notification immediately - it should be throttled - m.NotifyNewTransactions() - - // No block should be published immediately due to throttling + // Wait for the block timer to fire and trigger block production select { case <-pubMock.calls: - require.Fail("Block was published despite throttling") - case <-time.After(30 * time.Millisecond): - // This is expected - no immediate block + // Block was published after block timer, which is what we expect + case <-time.After(blockTime * 2): + require.Fail("Block was not published after block timer expired") } - // Wait for slightly longer than the throttle period to ensure the timer fires - time.Sleep(m.minTxNotifyInterval + 20*time.Millisecond) + // Reset the mock again to ensure we're only tracking new calls + pubMock.reset() + + // Send another notification immediately + m.NotifyNewTransactions() - // Now a block should be published + // Wait for the block timer to fire again select { case <-pubMock.calls: - // Block was published after throttle period - case <-time.After(100 * time.Millisecond): - require.Fail("Block was not published after throttle period") + // Block was published after block timer, which is what we expect + case <-time.After(blockTime * 2): + require.Fail("Block was not published after second block timer expired") } cancel() @@ -260,8 +259,6 @@ func TestNormalAggregationLoop_TxNotification(t *testing.T) { // Create the notification channel m.txNotifyCh = make(chan struct{}, 1) - m.minTxNotifyInterval = 50 * time.Millisecond - m.lastTxNotifyTime = time.Time{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -292,8 +289,6 @@ func TestNormalAggregationLoop_TxNotification(t *testing.T) { // In normal mode, the notification should not trigger an immediate block select { case <-pubMock.calls: - // If we enable the optional enhancement to reset the timer, this might happen - // But with the current implementation, this should not happen require.Fail("Block was published immediately after notification in normal mode") case <-time.After(blockTime / 2): // This is expected - no immediate block diff --git a/block/manager.go b/block/manager.go index 65023600b3..30bab18f6b 100644 --- a/block/manager.go +++ b/block/manager.go @@ -155,15 +155,6 @@ type Manager struct { // txNotifyCh is used to signal when new transactions are available txNotifyCh chan struct{} - - // txNotifyThrottle controls how frequently we can produce blocks in response to tx notifications - txNotifyThrottle *time.Timer - - // lastTxNotifyTime tracks when we last produced a block due to tx notification - lastTxNotifyTime time.Time - - // minTxNotifyInterval is the minimum time between tx-triggered block productions - minTxNotifyInterval time.Duration } // getInitialState tries to load lastState from Store, and if it's not available it reads genesis. @@ -335,9 +326,6 @@ func NewManager( daH := atomic.Uint64{} daH.Store(s.DAHeight) - // Default minimum interval between tx-triggered blocks (can be made configurable) - minTxNotifyInterval := 500 * time.Millisecond - agg := &Manager{ signer: signer, config: config, @@ -347,30 +335,28 @@ func NewManager( dalc: dalc, daHeight: &daH, // channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary - HeaderCh: make(chan *types.SignedHeader, channelLength), - DataCh: make(chan *types.Data, channelLength), - headerInCh: make(chan NewHeaderEvent, headerInChLength), - dataInCh: make(chan NewDataEvent, headerInChLength), - headerStoreCh: make(chan struct{}, 1), - dataStoreCh: make(chan struct{}, 1), - headerStore: headerStore, - dataStore: dataStore, - lastStateMtx: new(sync.RWMutex), - lastBatchData: lastBatchData, - headerCache: cache.NewCache[types.SignedHeader](), - dataCache: cache.NewCache[types.Data](), - retrieveCh: make(chan struct{}, 1), - logger: logger, - buildingBlock: false, - pendingHeaders: pendingHeaders, - metrics: seqMetrics, - sequencer: sequencer, - exec: exec, - gasPrice: gasPrice, - gasMultiplier: gasMultiplier, - txNotifyCh: make(chan struct{}, 1), // Non-blocking channel - minTxNotifyInterval: minTxNotifyInterval, - lastTxNotifyTime: time.Time{}, // Zero time + HeaderCh: make(chan *types.SignedHeader, channelLength), + DataCh: make(chan *types.Data, channelLength), + headerInCh: make(chan NewHeaderEvent, headerInChLength), + dataInCh: make(chan NewDataEvent, headerInChLength), + headerStoreCh: make(chan struct{}, 1), + dataStoreCh: make(chan struct{}, 1), + headerStore: headerStore, + dataStore: dataStore, + lastStateMtx: new(sync.RWMutex), + lastBatchData: lastBatchData, + headerCache: cache.NewCache[types.SignedHeader](), + dataCache: cache.NewCache[types.Data](), + retrieveCh: make(chan struct{}, 1), + logger: logger, + buildingBlock: false, + pendingHeaders: pendingHeaders, + metrics: seqMetrics, + sequencer: sequencer, + exec: exec, + gasPrice: gasPrice, + gasMultiplier: gasMultiplier, + txNotifyCh: make(chan struct{}, 1), // Non-blocking channel } agg.init(ctx) // Set the default publishBlock implementation diff --git a/block/reaper_test.go b/block/reaper_test.go index a6cec844d2..d4bf6487f8 100644 --- a/block/reaper_test.go +++ b/block/reaper_test.go @@ -1,7 +1,6 @@ package block import ( - "context" "crypto/sha256" "encoding/hex" "testing" @@ -18,8 +17,6 @@ import ( ) func TestReaper_SubmitTxs_Success(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() mockExec := testmocks.NewExecutor(t) mockSeq := testmocks.NewSequencer(t) @@ -28,7 +25,7 @@ func TestReaper_SubmitTxs_Success(t *testing.T) { chainID := "test-chain" interval := 100 * time.Millisecond - reaper := NewReaper(ctx, mockExec, mockSeq, chainID, interval, logger, store) + reaper := NewReaper(t.Context(), mockExec, mockSeq, chainID, interval, logger, store) // Prepare transaction and its hash tx := []byte("tx1") @@ -55,8 +52,6 @@ func TestReaper_SubmitTxs_Success(t *testing.T) { } func TestReaper_SubmitTxs_NoTxs(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() mockExec := testmocks.NewExecutor(t) mockSeq := testmocks.NewSequencer(t) @@ -65,7 +60,7 @@ func TestReaper_SubmitTxs_NoTxs(t *testing.T) { chainID := "test-chain" interval := 100 * time.Millisecond - reaper := NewReaper(ctx, mockExec, mockSeq, chainID, interval, logger, store) + reaper := NewReaper(t.Context(), mockExec, mockSeq, chainID, interval, logger, store) // Mock GetTxs returning no transactions mockExec.On("GetTxs", mock.Anything).Return([][]byte{}, nil).Once() @@ -80,8 +75,6 @@ func TestReaper_SubmitTxs_NoTxs(t *testing.T) { func TestReaper_TxPersistence_AcrossRestarts(t *testing.T) { require := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Use separate mocks for each instance but share the store mockExec1 := testmocks.NewExecutor(t) @@ -100,7 +93,7 @@ func TestReaper_TxPersistence_AcrossRestarts(t *testing.T) { txKey := ds.NewKey(hex.EncodeToString(txHash[:])) // First reaper instance - reaper1 := NewReaper(ctx, mockExec1, mockSeq1, chainID, interval, logger, store) + reaper1 := NewReaper(t.Context(), mockExec1, mockSeq1, chainID, interval, logger, store) // Mock interactions for the first instance mockExec1.On("GetTxs", mock.Anything).Return([][]byte{tx}, nil).Once() @@ -112,12 +105,12 @@ func TestReaper_TxPersistence_AcrossRestarts(t *testing.T) { reaper1.SubmitTxs() // Verify the tx was marked as seen in the real store after the first run - has, err := store.Has(ctx, txKey) + has, err := store.Has(t.Context(), txKey) require.NoError(err) require.True(has, "Transaction should be marked as seen in the datastore after first submission") // Create a new reaper instance simulating a restart - reaper2 := NewReaper(ctx, mockExec2, mockSeq2, chainID, interval, logger, store) + reaper2 := NewReaper(t.Context(), mockExec2, mockSeq2, chainID, interval, logger, store) // Mock interactions for the second instance mockExec2.On("GetTxs", mock.Anything).Return([][]byte{tx}, nil).Once() From 090a0daba72da86c84514f12bddf410a570e4a0c Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 16:21:47 +0200 Subject: [PATCH 5/7] remove txpoll --- block/aggregation.go | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 6562620123..57a9ee6673 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -2,7 +2,6 @@ package block import ( "context" - "errors" "time" ) @@ -50,11 +49,6 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time lazyTimer := time.NewTimer(0) defer lazyTimer.Stop() - // Add a polling timer to periodically check for transactions - // This is a fallback mechanism in case notifications fail - txPollTimer := time.NewTimer(2 * time.Second) - defer txPollTimer.Stop() - // Track if we've received transactions but haven't built a block yet pendingTxs := false @@ -93,18 +87,6 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time blockTimer.Reset(m.config.Node.BlockTime.Duration) } - case <-txPollTimer.C: - hasTxs, err := m.checkForTransactions(ctx) - if err != nil { - m.logger.Error("Failed to check for transactions", "error", err) - } else if hasTxs && !m.buildingBlock && !pendingTxs { - // Same as with txNotifyCh - mark pending and wait for block timer - pendingTxs = true - m.logger.Debug("Found transactions in poll, waiting for more transactions", - "collection_time", m.config.Node.BlockTime.Duration) - blockTimer.Reset(m.config.Node.BlockTime.Duration) - } - txPollTimer.Reset(2 * time.Second) } } } @@ -162,23 +144,3 @@ func getRemainingSleep(start time.Time, interval time.Duration) time.Duration { return time.Millisecond } - -// checkForTransactions checks if there are any transactions available for processing -func (m *Manager) checkForTransactions(ctx context.Context) (bool, error) { - // Try to retrieve a batch without actually consuming it - batchData, err := m.retrieveBatch(ctx) - if err != nil { - if errors.Is(err, ErrNoBatch) { - // This is expected when there are no transactions - return false, nil - } - return false, err - } - - // If we got a batch with transactions, return true - if batchData != nil && batchData.Batch != nil && len(batchData.Transactions) > 0 { - return true, nil - } - - return false, nil -} From 10592a28f4bf8170d1835048f5a7d11fb551da84 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 2 May 2025 16:33:15 +0200 Subject: [PATCH 6/7] remove buildingblock from manager struct --- block/aggregation.go | 15 ++++++--------- block/manager.go | 4 ---- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/block/aggregation.go b/block/aggregation.go index 57a9ee6673..07ec6c2045 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -49,8 +49,8 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time lazyTimer := time.NewTimer(0) defer lazyTimer.Stop() - // Track if we've received transactions but haven't built a block yet pendingTxs := false + buildingBlock := false for { select { @@ -58,14 +58,16 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time return case <-lazyTimer.C: - m.buildingBlock = false pendingTxs = false + buildingBlock = true m.produceBlockLazy(ctx, "lazy_timer", lazyTimer, blockTimer) + buildingBlock = false case <-blockTimer.C: if pendingTxs { - m.buildingBlock = true + buildingBlock = true m.produceBlockLazy(ctx, "tx_collection_complete", lazyTimer, blockTimer) + buildingBlock = false pendingTxs = false } else { // Reset the block timer if no pending transactions @@ -73,7 +75,7 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time } case <-m.txNotifyCh: - if m.buildingBlock { + if buildingBlock { continue } @@ -118,11 +120,6 @@ func (m *Manager) produceBlockLazy(ctx context.Context, trigger string, lazyTime // Record the start time start := time.Now() - m.buildingBlock = true - defer func() { - m.buildingBlock = false - }() - // Attempt to publish the block if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { m.logger.Error("error while publishing block", "trigger", trigger, "error", err) diff --git a/block/manager.go b/block/manager.go index 30bab18f6b..f59dc0a3d0 100644 --- a/block/manager.go +++ b/block/manager.go @@ -129,9 +129,6 @@ type Manager struct { logger log.Logger - // For usage by Lazy Aggregator mode - buildingBlock bool - pendingHeaders *PendingHeaders // for reporting metrics @@ -349,7 +346,6 @@ func NewManager( dataCache: cache.NewCache[types.Data](), retrieveCh: make(chan struct{}, 1), logger: logger, - buildingBlock: false, pendingHeaders: pendingHeaders, metrics: seqMetrics, sequencer: sequencer, From af726cb3bb506a200f9bc1be96ad4588e72ba993 Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 2 May 2025 16:36:40 +0200 Subject: [PATCH 7/7] Update block/aggregation.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- block/aggregation.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/block/aggregation.go b/block/aggregation.go index 07ec6c2045..65eee449d7 100644 --- a/block/aggregation.go +++ b/block/aggregation.go @@ -49,7 +49,10 @@ func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Time lazyTimer := time.NewTimer(0) defer lazyTimer.Stop() + // pendingTxs tracks whether there are transactions waiting to be included in a block. pendingTxs := false + // buildingBlock indicates whether a block is currently being built, preventing + // new transactions from being processed during this time. buildingBlock := false for {