From 954a74ae9ec579071b6a8fc7a3ad7d09c0974128 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 31 Mar 2026 17:53:38 +0200 Subject: [PATCH 1/4] Publisher-mode synchronization option for failover scenario --- CHANGELOG.md | 1 + apps/evm/go.mod | 8 +- apps/evm/go.sum | 4 - block/internal/syncing/syncer.go | 21 ++++- block/internal/syncing/syncer_test.go | 116 +++++++++++++++++++++++ docs/guides/raft_production.md | 30 +++--- docs/learn/config.md | 12 ++- node/failover.go | 47 +++++++++- pkg/config/config.go | 4 +- pkg/config/config_test.go | 12 +++ pkg/raft/election.go | 58 ++++++++---- pkg/raft/election_test.go | 3 + pkg/raft/node.go | 7 +- pkg/raft/node_test.go | 6 ++ pkg/sync/sync_service.go | 93 ++++++++++++------- pkg/sync/sync_service_test.go | 73 ++++++++++++++- pkg/sync/syncer_status.go | 42 ++++++++- pkg/sync/syncer_status_test.go | 128 ++++++++++++++++++++++++++ test/e2e/failover_e2e_test.go | 110 ++++++++++++++-------- test/e2e/sut_helper.go | 27 ++++-- 20 files changed, 671 insertions(+), 131 deletions(-) create mode 100644 pkg/sync/syncer_status_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2be55c06f1..defc357d1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes +* Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222) * Improve execution/evm check for stored meta not stale [#3221](https://github.com/evstack/ev-node/pull/3221) ## v1.1.0-rc.1 diff --git a/apps/evm/go.mod b/apps/evm/go.mod index edd9c14311..cdce2d0ee3 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm go 1.25.7 -// replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/evm => ../../execution/evm -// ) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/evm => ../../execution/evm +) require ( github.com/ethereum/go-ethereum v1.17.2 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 239a59c985..e0249473d4 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8= github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI= github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o= -github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs= -github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk= github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo= -github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 73bd9b6cbf..37759683e1 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -694,6 +694,12 @@ var ( // TrySyncNextBlock attempts to sync the next available block // the event is always the next block in sequence as processHeightEvent ensures it. func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { + return s.trySyncNextBlockWithState(ctx, event, s.getLastState()) +} + +// trySyncNextBlockWithState attempts to sync the next available block using +// the provided current state as the validation/apply baseline. +func (s *Syncer) trySyncNextBlockWithState(ctx context.Context, event *common.DAHeightEvent, currentState types.State) error { select { case <-ctx.Done(): return ctx.Err() @@ -703,7 +709,6 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve header := event.Header data := event.Data nextHeight := event.Header.Height() - currentState := s.getLastState() headerHash := header.Hash().String() s.logger.Info().Uint64("height", nextHeight).Str("source", string(event.Source)).Msg("syncing block") @@ -1189,6 +1194,7 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } currentState := s.getLastState() + stateBootstrapped := false // Defensive: if lastState is not yet initialized (e.g., RecoverFromRaft called before Start), // load it from the store to ensure we have valid state for validation. @@ -1201,8 +1207,10 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS s.logger.Debug().Err(err).Msg("no state in store, using genesis defaults for recovery") currentState = types.State{ ChainID: s.genesis.ChainID, + InitialHeight: s.genesis.InitialHeight, LastBlockHeight: s.genesis.InitialHeight - 1, } + stateBootstrapped = true } } @@ -1214,11 +1222,18 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS return nil } else if currentState.LastBlockHeight+1 == raftState.Height { // raft is 1 block ahead // apply block - err := s.TrySyncNextBlock(ctx, &common.DAHeightEvent{ + event := &common.DAHeightEvent{ Header: &header, Data: &data, Source: "", - }) + } + err := s.trySyncNextBlockWithState(ctx, event, currentState) + if err != nil && stateBootstrapped && errors.Is(err, errInvalidState) { + s.logger.Debug().Err(err).Msg("raft recovery failed after bootstrap state init, retrying once") + // Keep strict validation semantics; this retry only guards startup ordering races. + s.SetLastState(currentState) + err = s.trySyncNextBlockWithState(ctx, event, currentState) + } if err != nil { return err } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index c6e03958bb..1ff2ad35fc 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -26,6 +26,7 @@ import ( "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/raft" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" @@ -306,6 +307,121 @@ func TestSequentialBlockSync(t *testing.T) { requireEmptyChan(t, errChan) } +func TestSyncer_RecoverFromRaft_BootstrapsStateWhenUninitialized(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // lastState intentionally not initialized to simulate recovery-before-start path. + data := makeData(gen.ChainID, 1, 0) + headerBz, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app0"), data, nil) + dataBz, err := data.MarshalBinary() + require.NoError(t, err) + + mockExec.EXPECT(). + ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything). + Return([]byte("app1"), nil). + Once() + + err = s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr.Hash(), + Header: headerBz, + Data: dataBz, + }) + require.NoError(t, err) + + state, err := st.GetState(t.Context()) + require.NoError(t, err) + require.Equal(t, gen.ChainID, state.ChainID) + require.Equal(t, uint64(1), state.LastBlockHeight) +} + +func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Non-empty state must remain strictly validated. + s.SetLastState(types.State{ + ChainID: "wrong-chain", + InitialHeight: 1, + LastBlockHeight: 0, + }) + + data := makeData(gen.ChainID, 1, 0) + headerBz, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app0"), data, nil) + dataBz, err := data.MarshalBinary() + require.NoError(t, err) + + err = s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr.Hash(), + Header: headerBz, + Data: dataBz, + }) + require.Error(t, err) + require.ErrorContains(t, err, "invalid chain ID") +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/docs/guides/raft_production.md b/docs/guides/raft_production.md index f2bb817e28..7789bf80ad 100644 --- a/docs/guides/raft_production.md +++ b/docs/guides/raft_production.md @@ -33,7 +33,7 @@ Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (o | `--evnode.raft.raft_addr` | `raft.raft_addr` | TCP address for Raft transport. | `0.0.0.0:5001` (Bind to private IP) | | `--evnode.raft.raft_dir` | `raft.raft_dir` | Directory for Raft data. | `/data/raft` (Must be persistent) | | `--evnode.raft.peers` | `raft.peers` | Comma-separated list of peer addresses in format `nodeID@host:port`. | `node-1@10.0.0.1:5001,node-2@10.0.0.2:5001,node-3@10.0.0.3:5001` | -| `--evnode.raft.bootstrap` | `raft.bootstrap` | Bootstrap the cluster. **Required** for initial setup. | `true` (See Limitations) | +| `--evnode.raft.bootstrap` | `raft.bootstrap` | Compatibility flag. Startup mode is selected automatically from persisted raft configuration state. | optional | ### Timeout Tuning @@ -55,11 +55,15 @@ Ideally, a failover should complete within `2 * BlockTime` to minimize user impa ## Production Deployment Principles -### 1. Static Peering & Bootstrap -Current implementation requires **Bootstrap Mode** (`--evnode.raft.bootstrap=true`) for all nodes participating in the cluster initialization. -* **All nodes** should list the full set of peers in `--evnode.raft.peers`. +### 1. Static Peering & Automatic Startup Mode +Use static peering with automatic mode selection from local raft configuration: +* If local raft configuration already exists in `--evnode.raft.raft_dir`, the node starts in rejoin mode. +* If no local raft configuration exists yet, the node bootstraps from configured peers. +* `--evnode.raft.bootstrap` is retained for compatibility but does not control mode selection. +* **All configured cluster members** should list the full set of peers in `--evnode.raft.peers`. * The `peers` list format is strict: `NodeID@Host:Port`. -* **Limitation**: Dynamic addition of peers (Run-time Membership Changes) via RPC/CLI is not currently exposed. The cluster membership is static based on the initial bootstrap configuration. +* **Limitation**: Dynamic addition of peers (run-time membership changes) via RPC/CLI is not currently exposed. +* **Not supported**: Joining an existing cluster as a brand-new node that was not part of the initial static membership. ### 2. Infrastructure Requirements * **Encrypted Network (CRITICAL)**: Raft traffic is **unencrypted** (plain TCP). You **MUST** run the cluster inside a private network, VPN, or encrypted mesh (e.g., WireGuard, Tailscale). **Never expose Raft ports to the public internet**; doing so allows attackers to hijack the cluster consensus. @@ -86,13 +90,13 @@ Monitor the following metrics (propagated via Prometheus if enabled): ```bash ./ev-node start \ - --node.aggregator \ - --raft.enable \ - --raft.node_id="node-1" \ - --raft.raft_addr="0.0.0.0:5001" \ - --raft.raft_dir="/var/lib/ev-node/raft" \ - --raft.bootstrap=true \ - --raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \ - --p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \ + --rollkit.node.aggregator=true \ + --evnode.raft.enable=true \ + --evnode.raft.node_id="node-1" \ + --evnode.raft.raft_addr="0.0.0.0:5001" \ + --evnode.raft.raft_dir="/var/lib/ev-node/raft" \ + --evnode.raft.bootstrap=true \ + --evnode.raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \ + --rollkit.p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \ ...other flags ``` diff --git a/docs/learn/config.md b/docs/learn/config.md index 52b5dfb861..26ba5d2092 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -1321,7 +1321,7 @@ _Constant:_ `FlagRaftDir` ### Raft Bootstrap **Description:** -If true, bootstraps a new Raft cluster. Only set this on the very first node when initializing a new cluster. +Legacy compatibility flag. Startup mode is now auto-selected from persisted raft configuration state, so this flag is not used to choose bootstrap vs rejoin. **YAML:** @@ -1352,6 +1352,16 @@ raft: _Default:_ `""` (empty) _Constant:_ `FlagRaftPeers` +### Raft Startup Mode + +Raft startup mode is selected automatically from local raft configuration state: + +* If the node already has persisted raft configuration in `raft.raft_dir`, it starts in rejoin mode. +* If no raft configuration exists yet, it bootstraps a cluster from configured peers. +* `raft.bootstrap` is retained for compatibility but does not control mode selection. + +`--evnode.raft.rejoin` has been removed. + ### Raft Snap Count **Description:** diff --git a/node/failover.go b/node/failover.go index c60e27a5f4..42dac4e8bc 100644 --- a/node/failover.go +++ b/node/failover.go @@ -33,6 +33,8 @@ type failoverState struct { dataSyncService *evsync.DataSyncService rpcServer *http.Server bc *block.Components + raftNode *raft.Node + isAggregator bool // catchup fields — used when the aggregator needs to sync before producing catchupEnabled bool @@ -172,6 +174,8 @@ func setupFailoverState( dataSyncService: dataSyncService, rpcServer: rpcServer, bc: bc, + raftNode: raftNode, + isAggregator: isAggregator, store: rktStore, catchupEnabled: catchupEnabled, catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration, @@ -179,6 +183,32 @@ func setupFailoverState( }, nil } +// shouldStartSyncInPublisherMode avoids startup deadlock when a raft leader boots +// with empty sync stores and no peer can serve height 1 yet. +func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool { + if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() { + return false + } + + storeHeight, err := f.store.Height(ctx) + if err != nil { + f.logger.Warn().Err(err).Msg("cannot determine store height; keeping blocking sync startup") + return false + } + headerHeight := f.headerSyncService.Store().Height() + dataHeight := f.dataSyncService.Store().Height() + if headerHeight > 0 || dataHeight > 0 { + return false + } + + f.logger.Info(). + Uint64("store_height", storeHeight). + Uint64("header_height", headerHeight). + Uint64("data_height", dataHeight). + Msg("raft-enabled aggregator with empty sync stores: starting sync services in publisher mode") + return true +} + func (f *failoverState) Run(pCtx context.Context) (multiErr error) { stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally // parent context is cancelled already, so we need to create a new one @@ -207,15 +237,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { }) // start header and data sync services concurrently to avoid cumulative startup delay. + startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx) syncWg, syncCtx := errgroup.WithContext(ctx) syncWg.Go(func() error { - if err := f.headerSyncService.Start(syncCtx); err != nil { + var err error + if startSyncInPublisherMode { + err = f.headerSyncService.StartForPublishing(syncCtx) + } else { + err = f.headerSyncService.Start(syncCtx) + } + if err != nil { return fmt.Errorf("header sync service: %w", err) } return nil }) syncWg.Go(func() error { - if err := f.dataSyncService.Start(syncCtx); err != nil { + var err error + if startSyncInPublisherMode { + err = f.dataSyncService.StartForPublishing(syncCtx) + } else { + err = f.dataSyncService.Start(syncCtx) + } + if err != nil { return fmt.Errorf("data sync service: %w", err) } return nil diff --git a/pkg/config/config.go b/pkg/config/config.go index 593f815540..09e85f3e20 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -400,7 +400,7 @@ type RaftConfig struct { NodeID string `mapstructure:"node_id" yaml:"node_id" comment:"Unique identifier for this node in the Raft cluster"` RaftAddr string `mapstructure:"raft_addr" yaml:"raft_addr" comment:"Address for Raft communication (host:port)"` RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"` - Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new Raft cluster (only for the first node)"` + Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new static Raft cluster during initial bring-up"` Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"` SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"` SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` @@ -646,7 +646,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagRaftNodeID, def.Raft.NodeID, "unique identifier for this node in the Raft cluster") cmd.Flags().String(FlagRaftAddr, def.Raft.RaftAddr, "address for Raft communication (host:port)") cmd.Flags().String(FlagRaftDir, def.Raft.RaftDir, "directory for Raft logs and snapshots") - cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new Raft cluster (only for the first node)") + cmd.Flags().Bool(FlagRaftBootstrap, def.Raft.Bootstrap, "bootstrap a new static Raft cluster during initial bring-up") cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)") cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of log entries between snapshots") cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 594114c770..cf556803c2 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -122,6 +122,18 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address) assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) + // Raft flags + assertFlagValue(t, flags, FlagRaftEnable, DefaultConfig().Raft.Enable) + assertFlagValue(t, flags, FlagRaftNodeID, DefaultConfig().Raft.NodeID) + assertFlagValue(t, flags, FlagRaftAddr, DefaultConfig().Raft.RaftAddr) + assertFlagValue(t, flags, FlagRaftDir, DefaultConfig().Raft.RaftDir) + assertFlagValue(t, flags, FlagRaftBootstrap, DefaultConfig().Raft.Bootstrap) + assertFlagValue(t, flags, FlagRaftPeers, DefaultConfig().Raft.Peers) + assertFlagValue(t, flags, FlagRaftSnapCount, DefaultConfig().Raft.SnapCount) + assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) + assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) + assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) + // Pruning flags assertFlagValue(t, flags, FlagPruningMode, DefaultConfig().Pruning.Mode) assertFlagValue(t, flags, FlagPruningKeepRecent, DefaultConfig().Pruning.KeepRecent) diff --git a/pkg/raft/election.go b/pkg/raft/election.go index 21b6243a95..757f07ca27 100644 --- a/pkg/raft/election.go +++ b/pkg/raft/election.go @@ -64,6 +64,7 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { close(errCh) }() + var runnable Runnable startWorker := func(name string, workerFunc func(ctx context.Context) error) { workerCancel() wg.Wait() // Ensure previous worker is fully stopped @@ -83,11 +84,26 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { } }(workerCtx) } + startFollower := func() error { + var err error + if runnable, err = d.followerFactory(); err != nil { + return err + } + // avoids validating against stale raft state. + if err = d.node.waitForMsgsLanded(d.node.Config().SendTimeout); err != nil { + // this wait can legitimately time out + d.logger.Debug().Err(err).Msg("timed out waiting for raft messages before follower verification; continuing") + } + if err = d.verifyState(ctx, runnable); err != nil { + return err + } + startWorker("follower", runnable.Run) + return nil + } ticker := time.NewTicker(300 * time.Millisecond) defer ticker.Stop() d.running.Store(true) defer d.running.Store(false) - var runnable Runnable for { select { case becameLeader := <-d.node.leaderCh(): @@ -144,15 +160,9 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { } else if !becameLeader && !isCurrentlyLeader && !isStarted { // start as a follower d.logger.Info().Msg("starting follower operations") isStarted = true - var err error - if runnable, err = d.followerFactory(); err != nil { + if err := startFollower(); err != nil { return err } - - if err = d.verifyState(ctx, runnable); err != nil { - return err - } - startWorker("follower", runnable.Run) } // LeaderCh fires only when leader changes not on initial election case <-ticker.C: @@ -171,11 +181,9 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error { d.logger.Info().Msg("starting follower operations") isStarted = true - var err error - if runnable, err = d.followerFactory(); err != nil { + if err := startFollower(); err != nil { return err } - startWorker("follower", runnable.Run) case err := <-errCh: return err case <-ctx.Done(): @@ -189,14 +197,32 @@ func (d *DynamicLeaderElection) verifyState(ctx context.Context, runnable Runnab // Verify sync state before starting follower operations raftState := d.node.GetState() if raftState == nil || raftState.Height == 0 { - // Initial/empty raft state - skip recovery and let normal sync handle it. - // This can happen during rolling restarts when the Raft FSM hasn't replayed logs yet. - d.logger.Info().Msg("raft state at height 0, skipping recovery to allow normal sync") - return nil + waitTimeout := d.node.Config().SendTimeout + deadline := time.NewTimer(waitTimeout) + defer deadline.Stop() + ticker := time.NewTicker(min(50*time.Millisecond, max(waitTimeout/4, time.Millisecond))) + defer ticker.Stop() + + for raftState == nil || raftState.Height == 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-deadline.C: + d.logger.Info().Msg("raft state still at height 0 after wait; skipping recovery to allow normal sync") + return nil + case <-ticker.C: + raftState = d.node.GetState() + } + } } diff, err := runnable.IsSynced(raftState) if err != nil { - return err + d.logger.Warn().Err(err).Uint64("raft_height", raftState.Height).Msg("sync check failed, attempting recovery from raft canonical state") + if recErr := runnable.Recover(ctx, raftState); recErr != nil { + return errors.Join(err, fmt.Errorf("recovery after sync-check failure: %w", recErr)) + } + d.logger.Info().Msg("recovery successful after sync-check failure") + return nil } if diff == 0 { return nil diff --git a/pkg/raft/election_test.go b/pkg/raft/election_test.go index 4d2b65be97..b29cbda8d7 100644 --- a/pkg/raft/election_test.go +++ b/pkg/raft/election_test.go @@ -25,6 +25,9 @@ func TestDynamicLeaderElectionRun(t *testing.T) { m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh)) m.EXPECT().leaderID().Return("other") m.EXPECT().NodeID().Return("self") + m.EXPECT().Config().Return(testCfg()) + m.EXPECT().waitForMsgsLanded(2 * time.Millisecond).Return(nil) + m.EXPECT().GetState().Return(&RaftBlockState{}) started := make(chan struct{}) follower := &testRunnable{startedCh: started} diff --git a/pkg/raft/node.go b/pkg/raft/node.go index ada6838560..f156ae4785 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -109,13 +109,8 @@ func (n *Node) Start(_ context.Context) error { if n == nil { return nil } - if !n.config.Bootstrap { - // it is intended to fail fast here. at this stage only bootstrap mode is supported. - return fmt.Errorf("raft cluster requires bootstrap mode") - } - if future := n.raft.GetConfiguration(); future.Error() == nil && len(future.Configuration().Servers) > 0 { - n.logger.Info().Msg("cluster already bootstrapped, skipping") + n.logger.Info().Msg("raft node started with existing local state") return nil } diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 9e3f8237a5..67b5ea0392 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -1,6 +1,7 @@ package raft import ( + "context" "errors" "testing" @@ -108,3 +109,8 @@ func TestDeduplicateServers(t *testing.T) { }) } } + +func TestNodeStartNilNoop(t *testing.T) { + var node *Node + require.NoError(t, node.Start(context.Background())) +} diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index b65b855a4b..947a74c70b 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -143,12 +143,9 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } - firstStart := false - if !syncService.syncerStatus.started.Load() { - firstStart = true - if err := syncService.startSyncer(ctx); err != nil { - return fmt.Errorf("failed to start syncer after initializing the store: %w", err) - } + firstStart, err := syncService.startSyncer(ctx) + if err != nil { + return fmt.Errorf("failed to start syncer after initializing the store: %w", err) } // Broadcast for subscribers @@ -190,20 +187,9 @@ func (s *SyncService[H]) AppendDAHint(ctx context.Context, daHeight uint64, heig // Start is a part of Service interface. func (syncService *SyncService[H]) Start(ctx context.Context) error { - // setup P2P infrastructure, but don't start Subscriber yet. - peerIDs, err := syncService.setupP2PInfrastructure(ctx) + peerIDs, err := syncService.prepareStart(ctx) if err != nil { - return fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err) - } - - // create syncer, must be before initFromP2PWithRetry which calls startSyncer. - if syncService.syncer, err = newSyncer( - syncService.ex, - syncService.store, - syncService.sub, - []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, - ); err != nil { - return fmt.Errorf("failed to create syncer: %w", err) + return err } // initialize stores from P2P (blocking until genesis is fetched for followers) @@ -223,20 +209,61 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error { return nil } -// startSyncer starts the SyncService's syncer -func (syncService *SyncService[H]) startSyncer(ctx context.Context) error { - if syncService.syncerStatus.isStarted() { - return nil +// StartForPublishing starts the sync service in publisher mode. +// +// This mode is used by a raft leader with an empty local store: no peer can serve +// height 1 yet, so waiting for initFromP2PWithRetry would deadlock block production. +// We still need the P2P exchange server and pubsub subscriber to be ready before the +// first block is produced, because WriteToStoreAndBroadcast relies on them to gossip +// the block that bootstraps the network. +func (syncService *SyncService[H]) StartForPublishing(ctx context.Context) error { + if _, err := syncService.prepareStart(ctx); err != nil { + return err } - if err := syncService.syncer.Start(ctx); err != nil { - return fmt.Errorf("failed to start syncer: %w", err) + if err := syncService.startSubscriber(ctx); err != nil { + return fmt.Errorf("failed to start subscriber: %w", err) } - syncService.syncerStatus.started.Store(true) return nil } +func (syncService *SyncService[H]) prepareStart(ctx context.Context) ([]peer.ID, error) { + // setup P2P infrastructure, but don't start Subscriber yet. + peerIDs, err := syncService.setupP2PInfrastructure(ctx) + if err != nil { + return nil, fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err) + } + + // create syncer, must be before initFromP2PWithRetry which calls startSyncer. + if syncService.syncer, err = newSyncer( + syncService.ex, + syncService.store, + syncService.sub, + []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, + ); err != nil { + return nil, fmt.Errorf("failed to create syncer: %w", err) + } + + return peerIDs, nil +} + +// startSyncer starts the SyncService's syncer. +// It returns true when this call performed the actual start. +func (syncService *SyncService[H]) startSyncer(ctx context.Context) (bool, error) { + startedNow, err := syncService.syncerStatus.startOnce(func() error { + if err := syncService.syncer.Start(ctx); err != nil { + return fmt.Errorf("failed to start syncer: %w", err) + } + return nil + }) + if err != nil { + return false, err + } + + return startedNow, nil +} + // initStore initializes the store with the given initial header. // it is a no-op if the store is already initialized. // Returns true when the store was initialized by this call. @@ -371,7 +398,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee return false, fmt.Errorf("failed to initialize the store: %w", err) } } - if err := syncService.startSyncer(ctx); err != nil { + if _, err := syncService.startSyncer(ctx); err != nil { return false, err } return true, nil @@ -386,6 +413,8 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee p2pInitTimeout := 30 * time.Second timeoutTimer := time.NewTimer(p2pInitTimeout) defer timeoutTimer.Stop() + retryTimer := time.NewTimer(backoff) + defer retryTimer.Stop() for { ok, err := tryInit(ctx) @@ -403,13 +432,13 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee Dur("timeout", p2pInitTimeout). Msg("P2P header sync initialization timed out, deferring to DA sync") return nil - case <-time.After(backoff): + case <-retryTimer.C: } - backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } + retryTimer.Reset(backoff) } } @@ -424,9 +453,9 @@ func (syncService *SyncService[H]) Stop(ctx context.Context) error { syncService.ex.Stop(ctx), syncService.sub.Stop(ctx), ) - if syncService.syncerStatus.isStarted() { - err = errors.Join(err, syncService.syncer.Stop(ctx)) - } + err = errors.Join(err, syncService.syncerStatus.stopIfStarted(func() error { + return syncService.syncer.Stop(ctx) + })) // Stop the store adapter if it has a Stop method if stopper, ok := syncService.store.(interface{ Stop(context.Context) error }); ok { err = errors.Join(err, stopper.Stop(ctx)) diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go index e8b1527f79..bf096cb6da 100644 --- a/pkg/sync/sync_service_test.go +++ b/pkg/sync/sync_service_test.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -25,6 +26,76 @@ import ( "github.com/evstack/ev-node/types" ) +func TestHeaderSyncServiceStartForPublishingWithPeers(t *testing.T) { + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + mn := mocknet.New() + + chainID := "test-chain-id" + genesisDoc := genesispkg.Genesis{ + ChainID: chainID, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: []byte("test"), + } + + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + logger := zerolog.Nop() + + nodeKey1, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node1_key.json")) + require.NoError(t, err) + host1, err := mn.AddPeer(nodeKey1.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + + nodeKey2, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node2_key.json")) + require.NoError(t, err) + host2, err := mn.AddPeer(nodeKey2.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + + require.NoError(t, mn.LinkAll()) + require.NoError(t, mn.ConnectAllButSelf()) + + client1, err := p2p.NewClientWithHost(conf.P2P, nodeKey1.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host1) + require.NoError(t, err) + client2, err := p2p.NewClientWithHost(conf.P2P, nodeKey2.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host2) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, client1.Start(ctx)) + require.NoError(t, client2.Start(ctx)) + t.Cleanup(func() { _ = client1.Close() }) + t.Cleanup(func() { _ = client2.Close() }) + + require.Eventually(t, func() bool { + return len(client1.PeerIDs()) > 0 + }, time.Second, 10*time.Millisecond) + + evStore := store.New(mainKV) + svc, err := NewHeaderSyncService(evStore, conf, genesisDoc, client1, logger) + require.NoError(t, err) + require.NoError(t, svc.StartForPublishing(ctx)) + t.Cleanup(func() { _ = svc.Stop(context.Background()) }) + + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(t.Context(), &headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + require.NoError(t, signedHeader.Validate()) + + require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) + require.True(t, svc.storeInitialized.Load()) +} + func TestHeaderSyncServiceRestart(t *testing.T) { mainKV := sync.MutexWrap(datastore.NewMapDatastore()) pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) @@ -78,7 +149,7 @@ func TestHeaderSyncServiceRestart(t *testing.T) { require.NoError(t, signedHeader.Validate()) require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) - for i := genesisDoc.InitialHeight + 1; i < 2; i++ { + for i := genesisDoc.InitialHeight + 1; i < 10; i++ { signedHeader = nextHeader(t, signedHeader, genesisDoc.ChainID, noopSigner) t.Logf("signed header: %d", i) require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) diff --git a/pkg/sync/syncer_status.go b/pkg/sync/syncer_status.go index 1fe26008c0..70364f2762 100644 --- a/pkg/sync/syncer_status.go +++ b/pkg/sync/syncer_status.go @@ -1,13 +1,49 @@ package sync -import "sync/atomic" +import "sync" // SyncerStatus is used by header and block exchange service for keeping track // of the status of the syncer in them. type SyncerStatus struct { - started atomic.Bool + mu sync.Mutex + started bool } func (syncerStatus *SyncerStatus) isStarted() bool { - return syncerStatus.started.Load() + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + return syncerStatus.started +} + +func (syncerStatus *SyncerStatus) startOnce(startFn func() error) (bool, error) { + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + if syncerStatus.started { + return false, nil + } + + if err := startFn(); err != nil { + return false, err + } + + syncerStatus.started = true + return true, nil +} + +func (syncerStatus *SyncerStatus) stopIfStarted(stopFn func() error) error { + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + if !syncerStatus.started { + return nil + } + + if err := stopFn(); err != nil { + return err + } + + syncerStatus.started = false + return nil } diff --git a/pkg/sync/syncer_status_test.go b/pkg/sync/syncer_status_test.go new file mode 100644 index 0000000000..01ecbdf490 --- /dev/null +++ b/pkg/sync/syncer_status_test.go @@ -0,0 +1,128 @@ +package sync + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSyncerStatusStartOnce(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + run func(*testing.T, *SyncerStatus) + }{ + "concurrent_start_only_runs_once": { + run: func(t *testing.T, status *SyncerStatus) { + t.Helper() + + var calls atomic.Int32 + started := make(chan struct{}) + release := make(chan struct{}) + var wg sync.WaitGroup + + for range 8 { + wg.Add(1) + go func() { + defer wg.Done() + _, err := status.startOnce(func() error { + if calls.Add(1) == 1 { + close(started) + } + <-release + return nil + }) + require.NoError(t, err) + }() + } + + <-started + close(release) + wg.Wait() + + require.Equal(t, int32(1), calls.Load()) + require.True(t, status.isStarted()) + }, + }, + "failed_start_can_retry": { + run: func(t *testing.T, status *SyncerStatus) { + t.Helper() + + var calls atomic.Int32 + errBoom := errors.New("boom") + + startedNow, err := status.startOnce(func() error { + calls.Add(1) + return errBoom + }) + require.ErrorIs(t, err, errBoom) + require.False(t, startedNow) + require.False(t, status.isStarted()) + + startedNow, err = status.startOnce(func() error { + calls.Add(1) + return nil + }) + require.NoError(t, err) + require.True(t, startedNow) + require.True(t, status.isStarted()) + require.Equal(t, int32(2), calls.Load()) + }, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + spec.run(t, &SyncerStatus{}) + }) + } +} + +func TestSyncerStatusStopIfStarted(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + started bool + wantErr bool + }{ + "no_op_when_not_started": { + started: false, + wantErr: false, + }, + "stop_clears_started": { + started: true, + wantErr: false, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + + status := &SyncerStatus{started: spec.started} + var stopCalls atomic.Int32 + + err := status.stopIfStarted(func() error { + stopCalls.Add(1) + return nil + }) + + if spec.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if spec.started { + require.Equal(t, int32(1), stopCalls.Load()) + } else { + require.Zero(t, stopCalls.Load()) + } + require.False(t, status.isStarted()) + }) + } +} diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 30d689d00a..1e1f53dfd1 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -26,6 +26,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -34,6 +35,7 @@ import ( evmtest "github.com/evstack/ev-node/execution/evm/test" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" coreda "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/rpc/client" rpcclient "github.com/evstack/ev-node/pkg/rpc/client" "github.com/evstack/ev-node/types" @@ -82,38 +84,41 @@ func TestLeaseFailoverE2E(t *testing.T) { clusterNodes := &raftClusterNodes{ nodes: make(map[string]*nodeDetails), } - node1P2PAddr := env.Endpoints.GetRollkitP2PAddress() - node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress() - node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PListen := env.Endpoints.GetRollkitP2PAddress() + node2P2PListen := env.Endpoints.GetFullNodeP2PAddress() + node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen) + node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen) + node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen) // Start node1 (bootstrap mode) go func() { p2pPeers := node2P2PAddr + "," + node3P2PAddr proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), - bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(), + bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile) - clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) + clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) t.Log("Node1 is up") }() - // Start node2 (bootstrap node) + // Start node2 go func() { t.Log("Starting Node2") p2pPeers := node1P2PAddr + "," + node3P2PAddr - proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) - clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) + proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) + clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) t.Log("Node2 is up") }() - // Start node3 (bootstrap node) + // Start node3 node3EthAddr := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EthPort) go func() { t.Log("Starting Node3") p2pPeers := node1P2PAddr + "," + node2P2PAddr node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t)) ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort) - proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile) - clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr) + proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile) + clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr) t.Log("Node3 is up") }() @@ -172,11 +177,11 @@ func TestLeaseFailoverE2E(t *testing.T) { } } oldDetails := clusterNodes.Details(oldLeader) - restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile) + restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile) t.Log("Restarted old leader to sync with cluster: " + oldLeader) if IsNodeUp(t, oldDetails.rpcAddr, NodeStartupTimeout) { - clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.engineURL, oldDetails.ethAddr) + clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, oldDetails.p2pAddr, oldDetails.p2pPeerAddr, oldDetails.engineURL, oldDetails.ethAddr) } else { t.Log("+++ old leader did not recover on restart. Skipping node verification") } @@ -276,38 +281,41 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { clusterNodes := &raftClusterNodes{ nodes: make(map[string]*nodeDetails), } - node1P2PAddr := env.Endpoints.GetRollkitP2PAddress() - node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress() - node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PListen := env.Endpoints.GetRollkitP2PAddress() + node2P2PListen := env.Endpoints.GetFullNodeP2PAddress() + node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen) + node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen) + node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen) // Start node1 (bootstrap mode) go func() { p2pPeers := node2P2PAddr + "," + node3P2PAddr proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), - bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(), + bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile) - clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) + clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) t.Log("Node1 is up") }() - // Start node2 (bootstrap node) + // Start node2 go func() { t.Log("Starting Node2") p2pPeers := node1P2PAddr + "," + node3P2PAddr - proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) - clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) + proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) + clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) t.Log("Node2 is up") }() - // Start node3 (bootstrap node) + // Start node3 node3EthAddr := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EthPort) go func() { t.Log("Starting Node3") p2pPeers := node1P2PAddr + "," + node2P2PAddr node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t)) ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort) - proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile) - clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr) + proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile) + clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr) t.Log("Node3 is up") }() @@ -393,7 +401,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { nodeDetails.engineURL, nodeDetails.ethAddr, false, passphraseFile) clusterNodes.Set(nodeName, nodeDetails.rpcAddr, restartedProc, nodeDetails.ethAddr, - nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.engineURL, nodeDetails.ethAddr) + nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.p2pPeerAddr, nodeDetails.engineURL, nodeDetails.ethAddr) } // Initial restart of all nodes @@ -721,13 +729,23 @@ func initChain(t *testing.T, sut *SystemUnderTest, workDir string) string { require.NoError(t, err, "failed to init node", output) return passphraseFile } + +func mustNodeP2PMultiAddr(t *testing.T, workDir, nodeID, listenAddr string) string { + t.Helper() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Join(workDir, nodeID, "config")) + require.NoError(t, err) + peerID, err := peer.IDFromPrivateKey(nodeKey.PrivKey) + require.NoError(t, err) + return fmt.Sprintf("%s/p2p/%s", listenAddr, peerID.String()) +} + func setupRaftSequencerNode( t *testing.T, sut *SystemUnderTest, workDir, nodeID, raftAddr, jwtSecret, genesisHash, daAddress, bootstrapDir string, allRaftClusterMembers []string, p2pPeers, rpcAddr, p2pAddr, engineURL, ethURL string, - bootstrap bool, + raftBootstrap bool, passphraseFile string, ) *os.Process { t.Helper() @@ -735,7 +753,7 @@ func setupRaftSequencerNode( raftDir := filepath.Join(nodeHome, "raft") jwtSecretFile := filepath.Join(nodeHome, "jwt-secret.hex") - if bootstrap { + if bootstrapDir != "" { initChain(t, sut, nodeHome) jwtSecretFile = createJWTSecretFile(t, nodeHome, jwtSecret) @@ -770,7 +788,7 @@ func setupRaftSequencerNode( "--evnode.raft.node_id="+nodeID, "--evnode.raft.raft_addr="+raftAddr, "--evnode.raft.raft_dir="+raftDir, - "--evnode.raft.bootstrap=true", + fmt.Sprintf("--evnode.raft.bootstrap=%t", raftBootstrap), "--evnode.raft.peers="+strings.Join(raftPeers, ","), "--evnode.raft.snap_count=10", "--evnode.raft.send_timeout=100ms", @@ -843,6 +861,7 @@ type nodeDetails struct { xRPCClient atomic.Pointer[rpcclient.Client] running atomic.Bool p2pAddr string + p2pPeerAddr string engineURL string ethURL string } @@ -895,10 +914,10 @@ type raftClusterNodes struct { nodes map[string]*nodeDetails } -func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, engineURL string, ethURL string) { +func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, p2pPeerAddr string, engineURL string, ethURL string) { c.mx.Lock() defer c.mx.Unlock() - d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, engineURL: engineURL, ethURL: ethURL} + d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, p2pPeerAddr: p2pPeerAddr, engineURL: engineURL, ethURL: ethURL} d.running.Store(true) c.nodes[node] = d } @@ -945,13 +964,27 @@ func leader(t require.TestingT, nodes map[string]*nodeDetails) (string, *nodeDet continue } resp, err := client.Get(details.rpcAddr + "/raft/node") - require.NoError(t, err) - defer resp.Body.Close() + if err != nil { + continue + } - var status nodeStatus - require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) + isLeader := false + func() { + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return + } - if status.IsLeader { + var status nodeStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return + } + + if status.IsLeader { + isLeader = true + } + }() + if isLeader { return node, details } } @@ -973,16 +1006,17 @@ func must[T any](r T, err error) T { func IsNodeUp(t *testing.T, rpcAddr string, timeout time.Duration) bool { t.Helper() t.Logf("Query node is up: %s", rpcAddr) - ctx, done := context.WithTimeout(context.Background(), timeout) + ctx, done := context.WithTimeout(t.Context(), timeout) defer done() - ticker := time.Tick(min(timeout/10, 200*time.Millisecond)) + ticker := time.NewTicker(min(timeout/10, 200*time.Millisecond)) + defer ticker.Stop() c := client.NewClient(rpcAddr) require.NotNil(t, c) var lastBlock uint64 for { select { - case <-ticker: + case <-ticker.C: switch s, err := c.GetState(ctx); { case err != nil: // ignore case lastBlock == 0: diff --git a/test/e2e/sut_helper.go b/test/e2e/sut_helper.go index f5783da8bb..633d1efdbb 100644 --- a/test/e2e/sut_helper.go +++ b/test/e2e/sut_helper.go @@ -162,8 +162,12 @@ func (s *SystemUnderTest) awaitProcessCleanup(cmd *exec.Cmd) { s.cmdToPids[cmdKey] = append(s.cmdToPids[cmdKey], pid) s.pidsLock.Unlock() go func() { - _ = cmd.Wait() // blocks until shutdown - s.logf("Process stopped, pid: %d\n", pid) + waitErr := cmd.Wait() // blocks until shutdown + if waitErr != nil { + s.logf("Process stopped, pid: %d, err: %v\n", pid, waitErr) + } else { + s.logf("Process stopped, pid: %d\n", pid) + } s.pidsLock.Lock() defer s.pidsLock.Unlock() delete(s.pids, pid) @@ -182,11 +186,9 @@ func (s *SystemUnderTest) watchLogs(prefix string, cmd *exec.Cmd) { outReader, err := cmd.StdoutPipe() require.NoError(s.t, err) - if s.debug { - logDir := filepath.Join(WorkDir, "testnet") + if logDir := s.processLogDir(); logDir != "" { require.NoError(s.t, os.MkdirAll(logDir, 0o750)) - testName := strings.ReplaceAll(s.t.Name(), "/", "-") - logfileName := filepath.Join(logDir, prefix+fmt.Sprintf("exec-%s-%s-%d.out", filepath.Base(cmd.Args[0]), testName, time.Now().UnixNano())) + logfileName := filepath.Join(logDir, prefix+fmt.Sprintf("exec-%s-%d.out", filepath.Base(cmd.Args[0]), time.Now().UnixNano())) logfile, err := os.Create(logfileName) require.NoError(s.t, err) errReader = io.NopCloser(io.TeeReader(errReader, logfile)) @@ -202,6 +204,19 @@ func (s *SystemUnderTest) watchLogs(prefix string, cmd *exec.Cmd) { }) } +func (s *SystemUnderTest) processLogDir() string { + logRoot := strings.TrimSpace(os.Getenv("EV_E2E_LOG_DIR")) + if logRoot == "" && s.debug { + logRoot = filepath.Join(WorkDir, "testnet") + } + if logRoot == "" { + return "" + } + + testName := strings.ReplaceAll(s.t.Name(), "/", "-") + return filepath.Join(logRoot, testName) +} + // PrintBuffer outputs the contents of outBuff and errBuff to stdout, prefixing each entry with "out>" or "err>", respectively. func (s *SystemUnderTest) PrintBuffer() { out := os.Stdout From 74641e06abf671579d754a17dd515ebe0e59a529 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 31 Mar 2026 18:06:53 +0200 Subject: [PATCH 2/4] Changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index defc357d1f..095ef4346d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +<<<<<<< HEAD ### Changes * Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222) * Improve execution/evm check for stored meta not stale [#3221](https://github.com/evstack/ev-node/pull/3221) +======= +### Added + +* Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222) +>>>>>>> a2b0ff76 (Changelog) ## v1.1.0-rc.1 From 53f5ded75d3c99c0aba8588c8b1cff176d7b2ef9 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 2 Apr 2026 13:42:23 +0200 Subject: [PATCH 3/4] Review feedback --- CHANGELOG.md | 6 ------ block/internal/syncing/syncer.go | 8 -------- 2 files changed, 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 095ef4346d..defc357d1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,16 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -<<<<<<< HEAD ### Changes * Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222) * Improve execution/evm check for stored meta not stale [#3221](https://github.com/evstack/ev-node/pull/3221) -======= -### Added - -* Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222) ->>>>>>> a2b0ff76 (Changelog) ## v1.1.0-rc.1 diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 37759683e1..802c1b243d 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -1194,7 +1194,6 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } currentState := s.getLastState() - stateBootstrapped := false // Defensive: if lastState is not yet initialized (e.g., RecoverFromRaft called before Start), // load it from the store to ensure we have valid state for validation. @@ -1210,7 +1209,6 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS InitialHeight: s.genesis.InitialHeight, LastBlockHeight: s.genesis.InitialHeight - 1, } - stateBootstrapped = true } } @@ -1228,12 +1226,6 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS Source: "", } err := s.trySyncNextBlockWithState(ctx, event, currentState) - if err != nil && stateBootstrapped && errors.Is(err, errInvalidState) { - s.logger.Debug().Err(err).Msg("raft recovery failed after bootstrap state init, retrying once") - // Keep strict validation semantics; this retry only guards startup ordering races. - s.SetLastState(currentState) - err = s.trySyncNextBlockWithState(ctx, event, currentState) - } if err != nil { return err } From a2607e54c1d7ec8b382146b170ccaf8ae3b03d63 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 2 Apr 2026 13:45:36 +0200 Subject: [PATCH 4/4] Doc update --- docs/guides/raft_production.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/guides/raft_production.md b/docs/guides/raft_production.md index 7789bf80ad..9e02758da9 100644 --- a/docs/guides/raft_production.md +++ b/docs/guides/raft_production.md @@ -22,7 +22,7 @@ This guide details the Raft consensus implementation in `ev-node`, used for High ## Configuration -Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (or `[rollkit.raft]`) section. +Raft is configured via CLI flags or the `config.toml` file under the `[raft]` (or `[evnode.raft]`) section. ### Essential Flags @@ -90,13 +90,13 @@ Monitor the following metrics (propagated via Prometheus if enabled): ```bash ./ev-node start \ - --rollkit.node.aggregator=true \ + --evnode.node.aggregator=true \ --evnode.raft.enable=true \ --evnode.raft.node_id="node-1" \ --evnode.raft.raft_addr="0.0.0.0:5001" \ --evnode.raft.raft_dir="/var/lib/ev-node/raft" \ --evnode.raft.bootstrap=true \ --evnode.raft.peers="node-1@10.0.1.1:5001,node-2@10.0.1.2:5001,node-3@10.0.1.3:5001" \ - --rollkit.p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \ + --evnode.p2p.listen_address="/ip4/0.0.0.0/tcp/26656" \ ...other flags ```