diff --git a/CHANGELOG.md b/CHANGELOG.md index 912343ec00..119fcf00e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Improve P2P connection in case of transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212) + ## v1.1.0-rc.1 ### Added diff --git a/apps/evm/go.mod b/apps/evm/go.mod index edd9c14311..cdce2d0ee3 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm go 1.25.7 -// replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/evm => ../../execution/evm -// ) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/evm => ../../execution/evm +) require ( github.com/ethereum/go-ethereum v1.17.2 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 239a59c985..e0249473d4 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8= github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI= github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o= -github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs= -github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk= github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo= -github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index c235c3f07b..aeb4b4bbb2 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp go 1.25.7 -// replace github.com/evstack/ev-node => ../../. +replace github.com/evstack/ev-node => ../../. require ( github.com/evstack/ev-node v1.1.0-rc.1 diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index be901c574f..8670e575e8 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -432,8 +432,6 @@ github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87K github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= -github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs= -github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk= github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index e8fc6eabe1..b9985378ce 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/ipfs/go-datastore" @@ -28,13 +29,26 @@ import ( rollhash "github.com/evstack/ev-node/pkg/hash" ) -// TODO(tzdybal): refactor to configuration parameters const ( // reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT. reAdvertisePeriod = 1 * time.Hour // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 + + // peerDiscoveryInterval is how often the background loop re-advertises and + // re-runs peer discovery via DHT. + peerDiscoveryInterval = 5 * time.Minute + + // reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer. + reconnectCooldown = 5 * time.Second + + // maxReconnectCooldown caps the exponential backoff for seed peer reconnection. + maxReconnectCooldown = 5 * time.Minute + + // connectWorkers limits the number of concurrent connection attempts during + // periodic peer discovery refresh. + connectWorkers = 16 ) // Client is a P2P client, implemented with libp2p. @@ -56,6 +70,13 @@ type Client struct { ps *pubsub.PubSub started bool + seedPeers []peer.AddrInfo + + maintenanceCancel context.CancelFunc + maintenanceWg sync.WaitGroup + reconnectCh chan peer.ID + connectSem chan struct{} + metrics *Metrics } @@ -164,17 +185,29 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } + c.reconnectCh = make(chan peer.ID, 32) + c.connectSem = make(chan struct{}, connectWorkers) + c.logger.Debug().Msg("setting up active peer discovery") if err := c.peerDiscovery(ctx); err != nil { return err } c.started = true + + c.host.Network().Notify(c.newDisconnectNotifee()) + c.startConnectionMaintenance() + return nil } // Close gently stops Client. func (c *Client) Close() error { + if c.maintenanceCancel != nil { + c.maintenanceCancel() + } + c.maintenanceWg.Wait() + var err error if c.dht != nil { err = errors.Join(err, c.dht.Close()) @@ -245,6 +278,153 @@ func (c *Client) Peers() []PeerConnection { return res } +// disconnectNotifee is a network.Notifee that triggers seed peer reconnection +// when a configured seed peer disconnects. +type disconnectNotifee struct { + c *Client +} + +func (n disconnectNotifee) Connected(_ network.Network, _ network.Conn) {} +func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} +func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} +func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} +func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} + +func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { + p := conn.RemotePeer() + if n.c.reconnectCh == nil { + return + } + for _, sp := range n.c.seedPeers { + if sp.ID == p { + select { + case n.c.reconnectCh <- p: + default: + } + return + } + } +} + +func (c *Client) newDisconnectNotifee() disconnectNotifee { + return disconnectNotifee{c: c} +} + +// startConnectionMaintenance launches a background goroutine that reconnects +// to seed peers on disconnect (driven by network.Notifee events) and +// periodically refreshes peer discovery. This ensures P2P connectivity +// recovers after transient network failures without requiring a full node restart. +func (c *Client) startConnectionMaintenance() { + ctx, cancel := context.WithCancel(context.Background()) + c.maintenanceCancel = cancel + + c.maintenanceWg.Go(func() { + + discoveryTicker := time.NewTicker(peerDiscoveryInterval) + defer discoveryTicker.Stop() + + type reconnectState struct { + lastAttempt time.Time + attempts int + } + states := make(map[peer.ID]*reconnectState) + + for { + select { + case <-ctx.Done(): + return + case pid := <-c.reconnectCh: + st := states[pid] + if st == nil { + st = &reconnectState{} + states[pid] = st + } + + if time.Since(st.lastAttempt) > maxReconnectCooldown { + st.attempts = 0 + } + + backoff := reconnectCooldown * time.Duration(1< maxReconnectCooldown { + backoff = maxReconnectCooldown + } + if time.Now().Before(st.lastAttempt.Add(backoff)) { + remaining := time.Until(st.lastAttempt.Add(backoff)) + time.AfterFunc(remaining, func() { + select { + case c.reconnectCh <- pid: + default: + } + }) + continue + } + st.lastAttempt = time.Now() + + for _, sp := range c.seedPeers { + if sp.ID != pid { + continue + } + if c.isConnected(sp.ID) { + st.attempts = 0 + break + } + st.attempts++ + c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer") + go func(info peer.AddrInfo) { + if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil { + c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer") + select { + case c.reconnectCh <- info.ID: + default: + } + } + }(sp) + break + } + case <-discoveryTicker.C: + c.refreshPeerDiscovery(ctx) + } + } + }) +} + +// refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT. +func (c *Client) refreshPeerDiscovery(ctx context.Context) { + if c.disc == nil { + return + } + + c.logger.Debug().Msg("refreshing peer discovery") + + _ = c.advertise(ctx) + + peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit)) + if err != nil { + c.logger.Warn().Err(err).Msg("peer discovery refresh failed") + return + } + + for p := range peerCh { + if p.ID == c.host.ID() || c.isConnected(p.ID) { + continue + } + select { + case c.connectSem <- struct{}{}: + go func(peer peer.AddrInfo) { + defer func() { <-c.connectSem }() + c.tryConnect(ctx, peer) + }(p) + case <-ctx.Done(): + return + } + } +} + +// isConnected returns true if there is an active connection to the given peer. +func (c *Client) isConnected(id peer.ID) bool { + return c.host.Network().Connectedness(id) == network.Connected +} + func (c *Client) listen() (host.Host, error) { maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) if err != nil { @@ -256,6 +436,7 @@ func (c *Client) listen() (host.Host, error) { func (c *Client) setupDHT(ctx context.Context) error { peers := c.parseAddrInfoList(c.conf.Peers) + c.seedPeers = peers if len(peers) == 0 { c.logger.Info().Msg("no peers - only listening for connections") } diff --git a/pkg/signer/aws/signer.go b/pkg/signer/aws/signer.go index 4c0d9e1d63..f5a2d5c270 100644 --- a/pkg/signer/aws/signer.go +++ b/pkg/signer/aws/signer.go @@ -159,7 +159,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) { timeout := s.opts.timeout() maxAttempts := maxRetries + 1 - for attempt := 0; attempt < maxAttempts; attempt++ { + for attempt := range maxAttempts { if err := ctx.Err(); err != nil { return nil, err } diff --git a/pkg/signer/gcp/signer.go b/pkg/signer/gcp/signer.go index 70f6667c24..2dc8b27a31 100644 --- a/pkg/signer/gcp/signer.go +++ b/pkg/signer/gcp/signer.go @@ -189,7 +189,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) { timeout := s.opts.timeout() maxAttempts := maxRetries + 1 - for attempt := 0; attempt < maxAttempts; attempt++ { + for attempt := range maxAttempts { if err := ctx.Err(); err != nil { return nil, err }