forked from evstack/ev-node
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreaper.go
More file actions
199 lines (171 loc) · 4.91 KB
/
reaper.go
File metadata and controls
199 lines (171 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package reaping
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/executing"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/genesis"
)
const (
// MaxBackoffInterval is the maximum backoff interval for retries
MaxBackoffInterval = 30 * time.Second
)
// Reaper is responsible for periodically retrieving transactions from the executor,
// filtering out already seen transactions, and submitting new transactions to the sequencer.
type Reaper struct {
exec coreexecutor.Executor
sequencer coresequencer.Sequencer
chainID string
interval time.Duration
cache cache.CacheManager
executor *executing.Executor
// shared components
logger zerolog.Logger
// Lifecycle
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewReaper creates a new Reaper instance.
func NewReaper(
exec coreexecutor.Executor,
sequencer coresequencer.Sequencer,
genesis genesis.Genesis,
logger zerolog.Logger,
executor *executing.Executor,
cache cache.CacheManager,
scrapeInterval time.Duration,
) (*Reaper, error) {
if executor == nil {
return nil, errors.New("executor cannot be nil")
}
if cache == nil {
return nil, errors.New("cache cannot be nil")
}
if scrapeInterval == 0 {
return nil, errors.New("scrape interval cannot be empty")
}
return &Reaper{
exec: exec,
sequencer: sequencer,
chainID: genesis.ChainID,
interval: scrapeInterval,
logger: logger.With().Str("component", "reaper").Logger(),
cache: cache,
executor: executor,
}, nil
}
// Start begins the execution component
func (r *Reaper) Start(ctx context.Context) error {
r.ctx, r.cancel = context.WithCancel(ctx)
// Start reaper loop
r.wg.Go(r.reaperLoop)
r.logger.Info().Dur("interval", r.interval).Msg("reaper started")
return nil
}
func (r *Reaper) reaperLoop() {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
cleanupTicker := time.NewTicker(1 * time.Hour)
defer cleanupTicker.Stop()
consecutiveFailures := 0
for {
select {
case <-r.ctx.Done():
return
case <-ticker.C:
err := r.SubmitTxs()
if err != nil {
// Increment failure counter and apply exponential backoff
consecutiveFailures++
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5)) // Cap at 2^5 = 32x
backoff = min(backoff, MaxBackoffInterval)
r.logger.Warn().
Err(err).
Int("consecutive_failures", consecutiveFailures).
Dur("next_retry_in", backoff).
Msg("reaper encountered error, applying backoff")
// Reset ticker with backoff interval
ticker.Reset(backoff)
} else {
// Reset failure counter and backoff on success
if consecutiveFailures > 0 {
r.logger.Info().Msg("reaper recovered from errors, resetting backoff")
consecutiveFailures = 0
ticker.Reset(r.interval)
}
}
case <-cleanupTicker.C:
// Clean up transaction hashes older than 24 hours
// This prevents unbounded growth of the transaction seen cache
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
if removed > 0 {
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
}
}
}
}
// Stop shuts down the reaper component
func (r *Reaper) Stop() error {
if r.cancel != nil {
r.cancel()
}
r.wg.Wait()
r.logger.Info().Msg("reaper stopped")
return nil
}
// SubmitTxs retrieves transactions from the executor and submits them to the sequencer.
// Returns an error if any critical operation fails.
func (r *Reaper) SubmitTxs() error {
txs, err := r.exec.GetTxs(r.ctx)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get txs from executor")
return fmt.Errorf("failed to get txs from executor: %w", err)
}
if len(txs) == 0 {
r.logger.Debug().Msg("no new txs")
return nil
}
var newTxs [][]byte
for _, tx := range txs {
txHash := hashTx(tx)
if !r.cache.IsTxSeen(txHash) {
newTxs = append(newTxs, tx)
}
}
if len(newTxs) == 0 {
r.logger.Debug().Msg("no new txs to submit")
return nil
}
r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer")
_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: newTxs},
})
if err != nil {
return fmt.Errorf("failed to submit txs to sequencer: %w", err)
}
for _, tx := range newTxs {
txHash := hashTx(tx)
r.cache.SetTxSeen(txHash)
}
// Notify the executor that new transactions are available
if len(newTxs) > 0 {
r.logger.Debug().Msg("notifying executor of new transactions")
r.executor.NotifyNewTransactions()
}
r.logger.Debug().Msg("successfully submitted txs")
return nil
}
func hashTx(tx []byte) string {
hash := sha256.Sum256(tx)
return hex.EncodeToString(hash[:])
}