consensus: persist AppQC, blocks, and CommitQCs with async persistence#2896
consensus: persist AppQC, blocks, and CommitQCs with async persistence#2896wen-coding wants to merge 58 commits intomainfrom
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2896 +/- ##
===========================================
+ Coverage 58.13% 77.23% +19.09%
===========================================
Files 2113 20 -2093
Lines 174071 1735 -172336
===========================================
- Hits 101204 1340 -99864
+ Misses 63812 254 -63558
+ Partials 9055 141 -8914
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
ebf93df to
f4a9c1e
Compare
Extract generic A/B file persistence into a reusable consensus/persist/ sub-package and add block-file persistence for crash-safe availability state recovery. Changes: - Move persist.go and persist_test.go into consensus/persist/ (git mv to preserve history), exporting Persister, NewPersister, WriteAndSync, SuffixA, SuffixB. - Add persist/blocks.go: per-block file persistence using <lane_hex>_<blocknum>.pb files in a blocks/ subdirectory, with load, delete-before, and header-mismatch validation. - Wire avail.NewState to accept stateDir, create A/B persister for AppQC and BlockPersister for signed lane proposals, and restore both on restart (contiguous block runs, queue alignment). - Update avail/state.go to persist AppQC on prune and delete obsolete block files after each AppQC advance. - Thread PersistentStateDir from consensus.Config through to avail.NewState. - Expand consensus/inner.go doc comment with full persistence design (what, why, recovery, write behavior, rebroadcasting). - Move TestRunOutputsPersistErrorPropagates to consensus/inner_test.go for proper package alignment. - Add comprehensive tests for blocks persistence (empty dir, multi-lane, corrupt/mismatched skip, DeleteBefore, filename roundtrip). Ref: sei-protocol/sei-v3#512 Co-authored-by: Cursor <cursoragent@cursor.com>
Move persisted data loading (AppQC deserialization and block loading) into a dedicated function for readability. Co-authored-by: Cursor <cursoragent@cursor.com>
Move block sorting, contiguous-prefix extraction, and gap truncation from avail/inner.go into persist/blocks.go so all disk-recovery logic lives in one place. This isolates storage concerns in the persistence layer, simplifying newInner and preparing for a future storage backend swap. Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
PushBlock and ProduceBlock now add blocks to the in-memory queue immediately and send a persist job to a background goroutine via a buffered channel. The background writer fsyncs each block to disk and advances a per-lane blockPersisted cursor under the inner lock. RecvBatch gates on this cursor so votes are only signed for blocks that have been durably written to disk. When persistence is disabled (testing), the cursor is nil and RecvBatch falls back to bq.next. Co-authored-by: Cursor <cursoragent@cursor.com>
newInner no longer takes a separate persistEnabled bool; loaded != nil already implies persistence is enabled. Tests with loaded data now correctly reflect this. Co-authored-by: Cursor <cursoragent@cursor.com>
blockPersisted is reconstructed from disk on restart, not persisted itself. Move its creation to just above the block restoration loop (past the loaded==nil early return) so the code reads top-down. Co-authored-by: Cursor <cursoragent@cursor.com>
05beddb to
2f0bbad
Compare
Co-authored-by: Cursor <cursoragent@cursor.com>
Move persistCh, persistJob, and the writer loop from avail/State into BlockPersister.Queue + BlockPersister.Run, so callers just call Queue() and the persist layer owns the channel, buffer sizing, and drain loop. Queue blocks with context to avoid holes in the sequential blockPersisted cursor (which would permanently stall voting). Call sites use utils.IgnoreAfterCancel to swallow shutdown errors. Co-authored-by: Cursor <cursoragent@cursor.com>
…l/sei-chain into wen/persist_appqc_and_blocks
Remove redundant loop that explicitly zeroed every lane. Map zero-values handle lanes without loaded blocks; only lanes with blocks on disk need an explicit write. Add comment explaining why starting at 0 is safe. Co-authored-by: Cursor <cursoragent@cursor.com>
- Add comment explaining why votes are not persisted and why the votes queue must be advanced past loaded blocks on restart. - Consolidate redundant tests: fold blockPersisted assertions into existing tests, remove TestNewInnerLoadedBlocksContiguousPrefix. - Add test that headers() returns ErrPruned for blocks before the loaded range (verifies votes queue advancement prevents hangs). Co-authored-by: Cursor <cursoragent@cursor.com>
Replace require.Contains(err.Error(), "...") with require.Error(err). Callers don't branch on specific error messages, so string matching adds no value; the test name already documents what is being rejected. Co-authored-by: Cursor <cursoragent@cursor.com>
BlockPersister now owns the per-lane contiguous persistence cursor and passes the exclusive upper bound to the callback. The caller no longer needs to compute n+1 or guard against out-of-order completion. This localizes the ordering assumption (FIFO queue) inside BlockPersister, so switching to parallel storage only requires changing BlockPersister.Run. Co-authored-by: Cursor <cursoragent@cursor.com>
Also add TODO for retry on persistence failure. Co-authored-by: Cursor <cursoragent@cursor.com>
| // When persistence is disabled, publish immediately. | ||
| // When enabled, the persist goroutine publishes after writing to disk, | ||
| // so consensus won't advance until the CommitQC is durable. | ||
| if inner.nextBlockToPersist == nil { |
There was a problem hiding this comment.
don't compare to nil, either make nextBlockToPersist into an Option, or check persisters field presence instead.
There was a problem hiding this comment.
alternatively, you can move branching to the persisters logic - i.e. have a dummy persister task which will just bump nextBlockToPersist and latestCommitQC without persisting anything - it would make the control flow in tests and in prod more similar imo.
There was a problem hiding this comment.
Added dummy persisters.
Moved DeleteBefore after AppQC persist so a crash between the two never leaves the on-disk AppQC pointing at a deleted CommitQC. Extracted the persist loop into its own runPersist method. Made-with: Cursor
Instead of conditionally spawning the persist goroutine and checking nextBlockToPersist == nil in PushCommitQC and RecvBatch, always run the persist goroutine with either real or no-op persisters. The no-op persisters skip disk I/O but still track cursors, making the test and production code paths identical. Made-with: Cursor
Instead of batching all block writes and updating nextBlockToPersist once at the end, persist each block individually and advance the cursor after each fsync. This makes vote latency equal to single-block write time regardless of backlog, preventing the positive feedback loop where slow persistence causes ever-growing batches. Also moves persistence cursors (nextBlockToPersist, nextCommitQCToPersist) into inner state, read directly by collectPersistBatch under lock. Made-with: Cursor
Made-with: Cursor
| } | ||
| first := bs[0].Number | ||
| q.reset(first) | ||
| for _, b := range bs { |
There was a problem hiding this comment.
nit: for consistency with populating CommitQCs, you might want to check the block numbers here as well. Maybe also put parent hash checking here?
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" |
| } | ||
|
|
||
| func TestQueueReset(t *testing.T) { | ||
| q := newQueue[uint64, string]() |
There was a problem hiding this comment.
add a test for reset() after populating the queue.
| const innerFile = "avail_inner" | ||
|
|
||
| // loadPersistedState loads persisted avail state from disk and creates persisters for ongoing writes. | ||
| func loadPersistedState(dir string) (*loadedAvailState, persisters, error) { |
There was a problem hiding this comment.
nit: for consistency, you might want to accept dir Option[string] and pass it to each persister constructor, which will act as a noop in case no dir is provided (given that you have added a noop mode to each of them anyway).
There was a problem hiding this comment.
nit: the caveat with how noop mode currently works is that you enable noop mode for each persister separately. Which is fine as long as fields of persisters type are private, and therefore it is not possible to set a combination of noop and real persisters (which would result in an invalid avail State on restart). Just a FYI
| } | ||
| s.markBlockPersisted(h.Lane(), h.BlockNumber()+1) | ||
| } | ||
| if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil { |
There was a problem hiding this comment.
Deleting blocks before persisting AppQC may lead to weird avail State, where we have a gap between the AppQC and the first persisted block of the lane. I don't see any obvious reason why it would be a bug in consensus protocol, but it is a nice invariant to have.
There was a problem hiding this comment.
moved all deletion to after AppQC is persisted
| if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil { | ||
| return fmt.Errorf("commitqc deleteBefore: %w", err) | ||
| } | ||
| s.markCommitQCsPersisted(commitQCCur, utils.Some(batch.commitQCs[len(batch.commitQCs)-1])) |
There was a problem hiding this comment.
nit: that can happen just after persisting commitQCs. CommitQCCur seems redundant - NextOpt(persistedCommitQC) should do.
| appQC utils.Option[*types.AppQC] | ||
| laneFirsts map[types.LaneID]types.BlockNumber | ||
| commitQCFirst types.RoadIndex | ||
| commitQCCur types.RoadIndex // snapshot of nextCommitQCToPersist (clamped) |
There was a problem hiding this comment.
nit: commitQCNext for consistency?
| for { | ||
| for lane, bq := range inner.blocks { | ||
| for i := max(bq.first, r.next[lane]); i < bq.next; i++ { | ||
| upperBound := min(bq.next, inner.nextBlockToPersist[lane]) |
There was a problem hiding this comment.
nit: add a TODO, that nextBlockToPersist might deserve a separate Watch to avoid waking up too often too many tasks (a potential optimization).
| // PersistBatch persists a batch of blocks to disk, updates tips after each | ||
| // successful write, and cleans up old files below laneFirsts. | ||
| // Returns the updated tips snapshot. | ||
| func (bp *BlockPersister) PersistBatch( |
| sorted := slices.Sorted(maps.Keys(bs)) | ||
| var contiguous []LoadedBlock | ||
| for i, n := range sorted { | ||
| if i > 0 && n != sorted[i-1]+1 { |
There was a problem hiding this comment.
I think here we should load the LAST contiguous range, no? Actually, this is more complicated. We need pruning to be consistent across the CommitQCs, blocks and AppQC. In particular which range is the one to be loaded depends on the AppQC. Also the blocks which do not belong to the relevant range should be pruned, so that they are not loaded later by accident - let's say that we have a gap, and but during loading we figured that we did not persist the AppQC for the last range, so we loaded the earlier range. Then after a while, we persisted the blocks from the gap and restarted again. Suddenly there is no gap any more, however the old blocks after a gap do not match the blocks before the gap.
There was a problem hiding this comment.
also, since persisting may fail before AppQC is persisted, we might end up with a block range which exceeds the lane capacity - this also needs to be taken care of during loading.
| require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) | ||
| } | ||
|
|
||
| func TestLoadCorruptMidSequenceTruncatesAtGap(t *testing.T) { |
There was a problem hiding this comment.
if it is not too complicated, it would be nice to have a test which makes avail.State to generate a gap (i.e. persist some stuff, then suddenly receive future data, then persist it) and try to restart the state from it, ensuring that all the latest data is actually loaded back.
|
|
||
| // PersistCommitQC writes a CommitQC to its own file. | ||
| func (cp *CommitQCPersister) PersistCommitQC(qc *types.CommitQC) error { | ||
| idx := qc.Index() |
There was a problem hiding this comment.
nit: you might want to add a check that qc.Index() >= cp.next and return an error otherwise.
|
|
||
| var contiguous []LoadedCommitQC | ||
| for i, idx := range sorted { | ||
| if i > 0 && idx != sorted[i-1]+1 { |
There was a problem hiding this comment.
ditto, we need the LAST contiguous range.
…tors private Each persister constructor (NewPersister, NewBlockPersister, NewCommitQCPersister) now accepts utils.Option[string] and returns a no-op implementation when None is passed. The previously-public no-op constructors are now private, ensuring they can only be created through the unified constructor path. This simplifies avail.NewState to a single loadPersistedState(stateDir) call with no branching, and guarantees all persisters are atomically either all-real or all-noop. Also adds block number/parent hash checks during inner block restoration and a queue reset-after-populate test. Made-with: Cursor
…CNext Block DeleteBefore now happens after AppQC is persisted, matching the existing CommitQC deletion ordering. This prevents a crash from leaving a gap between the on-disk AppQC and the first persisted block. Also renamed commitQCCur to commitQCNext for consistency with the "next" naming convention used elsewhere (queue.next, nextCommitQCToPersist). Made-with: Cursor
BlockPersister.PersistBatch, LoadTips, and the tips AtomicSend field are unused now that runPersist drives persistence via PersistBlock + markBlockPersisted directly. Removed all three. Added an order check in PersistCommitQC: returns error if idx < next (caller bug). Fixed stale comment referencing AtomicSend tips. Made-with: Cursor
…ke-ups Made-with: Cursor
latestCommitQC is now only written by markCommitQCsPersisted (after disk write) and on startup. The Store() in prune() is removed so that PushAppQC fast-forward no longer advances the cursor past what's actually persisted. collectPersistBatch derives the cursor via NextIndexOpt(latestCommitQC) with a max clamp against commitQCs.first to handle queue jumps from prune(). Made-with: Cursor
| for lane, q := range inner.blocks { | ||
| if inner.nextBlockToPersist[lane] < q.next { | ||
| return true | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, q := range inner.blocks { | ||
| start := max(inner.nextBlockToPersist[lane], q.first) | ||
| for n := start; n < q.next; n++ { | ||
| b.blocks = append(b.blocks, q.q[n]) | ||
| } | ||
| b.laneFirsts[lane] = q.first | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
Summary
Crash-safe persistence for availability state (AppQC, signed lane proposals, and CommitQCs). All I/O is fully asynchronous — no disk operations on the critical path or under locks.
consensus/persist/sub-package: Generic A/B file persistence (Persister[T]) with crash-safe alternating writes.BlockPersistermanages per-block files in ablocks/subdirectory.CommitQCPersistermanages per-road-index CommitQC files. No-op implementations (NewNoOpPersister,NewNoOpBlockPersister,NewNoOpCommitQCPersister) for test/disabled paths.Persister[T proto.Message]interface: Strongly-typed persistence API; concreteabPersister[T]handles A/B file strategy.NewNoOpPersister[T]silently discards writes. A/B suffixes are unexported;WriteRawFilehelper for corruption tests.persist/blocks.go): Each signed lane proposal stored as<lane_hex>_<blocknum>.pb. On load, blocks are sorted and truncated at the first gap.PersistBatchencapsulates the I/O: persist blocks, update tips, clean up old files. Whennoop, all disk I/O is skipped but cursor/tips tracking still works.persist/commitqcs.go): Each CommitQC stored ascommitqc_<roadindex>.pb. On load, QCs are sorted and truncated at the first gap. Needed for reconstructing FullCommitQCs on restart. Whennoop, disk I/O is skipped butnextindex tracking still works.innerstate directly (the in-memory state is the queue).collectPersistBatchacquires the lock, waits for new data, reads persistence cursors (inner.nextBlockToPersist,inner.nextCommitQCToPersist) directly from inner state, clamps past pruned entries, and collects the batch. I/O runs with no lock held. No channel, no backpressure. Write order: blocks → CommitQCs → AppQC → delete old CommitQCs — guaranteeing a matching CommitQC is always on disk before its AppQC, and the AppQC is persisted before old CommitQCs are deleted (crash-safe ordering).innerstate:nextBlockToPersist(per-lane) andnextCommitQCToPersisttrack how far persistence has progressed. Always initialized — the no-op persist goroutine bumps them immediately. Reconstructed from disk on restart.collectPersistBatchreads these cursors under lock (no parameter passing);markPersistedupdates them after successful I/O.prunecan delete map entries. Cursors are clamped toq.firstbefore reading to prevent nil pointer dereference. Regression test (TestStateWithPersistence) reliably catches this.stateDiris None),newNoOpPersisters()creates no-op implementations for all three persisters. The persist goroutine always runs (same code path as production) — it just skips disk I/O and immediately bumps cursors. This eliminates allnilchecks for disabled persistence and ensures tests exercise the exact same control flow as production.persistersstruct — pure I/O, always initialized: Groups the three disk persisters (appQC,blocks,commitQCs). Always present onState(not wrapped inOption). All inner state access goes throughStatemethods (collectPersistBatch,markPersisted), keeping a clean separation between orchestration and I/O.PushCommitQCno longer publisheslatestCommitQCdirectly — the persist goroutine publishes it after writing to disk (or immediately for no-op persisters) viamarkPersisted. Since consensus subscribes toLastCommitQC()to advance views, it won't proceed until the CommitQC is durable — preventing CommitQC loss on crash.avail/subscriptions.go):RecvBatchonly yields blocks below thenextBlockToPersistwatermark, so votes are only signed for durably written blocks.avail/state.go):NewStateacceptsstateDir, initialises the A/B persister (for AppQC),BlockPersister, andCommitQCPersister, and loads persisted data on restart. Returns error for corrupt state.avail/inner.go): Usesqueue.reset()to set starting positions, thenpushBackto reload entries. Finally callsinner.prune()with the persisted AppQC to advance all queues — same code path as runtime. Returns error for corrupt state (non-consecutive CommitQCs, AppQC without matching CommitQC on disk).DeleteBeforeremoves files for pruned blocks and orphaned lanes (from previous committees). Driven by the persist goroutine observinglaneFirsts.prune()returns(bool, error): Simplified from returning alaneFirstsmap — callers only need to know if pruning occurred. The persist goroutine readsq.firstdirectly.newStateconstructors:consensus/state.goexposesnewState()accepting a customPersisterfor test mocks (e.g.failPersister), avoiding fragile field mutation after construction.avail/state.go'sNewStateacceptsstateDirasOption[string].queue.reset()method: Clearly sets the starting position of an empty queue, replacing misleadingprune()calls during initialization.PersistBatchslower, which delaysnextBlockToPersistandRecvBatch, which delays voting, which grows batches further. Mitigation (e.g. persisting one block at a time) deferred.Ref: sei-protocol/sei-v3#512
Test plan
persist/blocks_test.go: load/store, gap truncation, DeleteBefore, orphaned lane cleanup, header mismatch, corrupt filespersist/commitqcs_test.go: load/store, gap truncation, DeleteBefore, corrupt files, mismatched indexpersist/persist_test.go: A/B file crash safety, seq management, corrupt fallback, generic typed APIavail/state_test.go: fresh start, load AppQC, load blocks, load both, load commitQCs, load commitQCs with AppQC, corrupt data, headers returns ErrPruned for blocks before loaded rangeavail/state_test.go(TestStateWithPersistence): end-to-end persist + prune race regression test (5 iterations with disk persistence; reliably catches cursor race without the clamp fix)avail/inner_test.go: newInner with loaded state, newInner with all three (AppQC + CommitQCs + blocks), newInner error cases (non-consecutive CommitQCs, AppQC without matching CommitQC), nextBlockToPersist reconstruction, nextCommitQCToPersist reconstruction, votes queue advancementavail/queue_test.go: newQueue, pushBack, reset, prune, stale prune, prune past nextconsensus/inner_test.go: consensus inner persistence round-trip, persist error propagation via newState injectiondata/state_test.go: data state tests