-
Notifications
You must be signed in to change notification settings - Fork 255
feat(pkg/p2p): reconnect on disconnected peers #3212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
28d6894
cf7c3ce
6069213
481f866
53e61a3
32dcf35
2ff6cee
e276ea2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| "errors" | ||
| "fmt" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/ipfs/go-datastore" | ||
|
|
@@ -28,13 +29,26 @@ | |
| rollhash "github.com/evstack/ev-node/pkg/hash" | ||
| ) | ||
|
|
||
| // TODO(tzdybal): refactor to configuration parameters | ||
| const ( | ||
| // reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT. | ||
| reAdvertisePeriod = 1 * time.Hour | ||
|
|
||
| // peerLimit defines limit of number of peers returned during active peer discovery. | ||
| peerLimit = 60 | ||
|
|
||
| // peerDiscoveryInterval is how often the background loop re-advertises and | ||
| // re-runs peer discovery via DHT. | ||
| peerDiscoveryInterval = 5 * time.Minute | ||
|
|
||
| // reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer. | ||
| reconnectCooldown = 5 * time.Second | ||
|
|
||
| // maxReconnectCooldown caps the exponential backoff for seed peer reconnection. | ||
| maxReconnectCooldown = 5 * time.Minute | ||
|
|
||
| // connectWorkers limits the number of concurrent connection attempts during | ||
| // periodic peer discovery refresh. | ||
| connectWorkers = 16 | ||
| ) | ||
|
|
||
| // Client is a P2P client, implemented with libp2p. | ||
|
|
@@ -56,6 +70,13 @@ | |
| ps *pubsub.PubSub | ||
| started bool | ||
|
|
||
| seedPeers []peer.AddrInfo | ||
|
|
||
| maintenanceCancel context.CancelFunc | ||
| maintenanceWg sync.WaitGroup | ||
| reconnectCh chan peer.ID | ||
| connectSem chan struct{} | ||
|
|
||
| metrics *Metrics | ||
| } | ||
|
|
||
|
|
@@ -164,17 +185,29 @@ | |
| return err | ||
| } | ||
|
|
||
| c.reconnectCh = make(chan peer.ID, 32) | ||
| c.connectSem = make(chan struct{}, connectWorkers) | ||
|
|
||
| c.logger.Debug().Msg("setting up active peer discovery") | ||
| if err := c.peerDiscovery(ctx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| c.started = true | ||
|
|
||
| c.host.Network().Notify(c.newDisconnectNotifee()) | ||
| c.startConnectionMaintenance() | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Close gently stops Client. | ||
| func (c *Client) Close() error { | ||
| if c.maintenanceCancel != nil { | ||
| c.maintenanceCancel() | ||
| } | ||
| c.maintenanceWg.Wait() | ||
|
Comment on lines
+206
to
+209
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Track and time-box the maintenance dials.
As per coding guidelines "Use context.Context for cancellation in Go" and "Be mindful of goroutine leaks in Go code". Also applies to: 373-381, 384-385, 401-419 🤖 Prompt for AI Agents |
||
|
|
||
| var err error | ||
| if c.dht != nil { | ||
| err = errors.Join(err, c.dht.Close()) | ||
|
|
@@ -245,6 +278,153 @@ | |
| return res | ||
| } | ||
|
|
||
| // disconnectNotifee is a network.Notifee that triggers seed peer reconnection | ||
| // when a configured seed peer disconnects. | ||
| type disconnectNotifee struct { | ||
| c *Client | ||
| } | ||
|
|
||
| func (n disconnectNotifee) Connected(_ network.Network, _ network.Conn) {} | ||
| func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} | ||
| func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} | ||
| func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} | ||
| func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} | ||
|
|
||
| func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { | ||
| p := conn.RemotePeer() | ||
| if n.c.reconnectCh == nil { | ||
| return | ||
| } | ||
| for _, sp := range n.c.seedPeers { | ||
| if sp.ID == p { | ||
| select { | ||
| case n.c.reconnectCh <- p: | ||
| default: | ||
| } | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (c *Client) newDisconnectNotifee() disconnectNotifee { | ||
| return disconnectNotifee{c: c} | ||
| } | ||
|
|
||
| // startConnectionMaintenance launches a background goroutine that reconnects | ||
| // to seed peers on disconnect (driven by network.Notifee events) and | ||
| // periodically refreshes peer discovery. This ensures P2P connectivity | ||
| // recovers after transient network failures without requiring a full node restart. | ||
| func (c *Client) startConnectionMaintenance() { | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| c.maintenanceCancel = cancel | ||
|
|
||
| c.maintenanceWg.Go(func() { | ||
|
Comment on lines
+317
to
+321
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Queue currently disconnected seed peers when maintenance starts. This worker only reacts to future 🤖 Prompt for AI Agents |
||
|
|
||
| discoveryTicker := time.NewTicker(peerDiscoveryInterval) | ||
| defer discoveryTicker.Stop() | ||
|
|
||
| type reconnectState struct { | ||
| lastAttempt time.Time | ||
| attempts int | ||
| } | ||
| states := make(map[peer.ID]*reconnectState) | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case pid := <-c.reconnectCh: | ||
| st := states[pid] | ||
| if st == nil { | ||
| st = &reconnectState{} | ||
| states[pid] = st | ||
| } | ||
|
|
||
| if time.Since(st.lastAttempt) > maxReconnectCooldown { | ||
| st.attempts = 0 | ||
| } | ||
|
|
||
| backoff := reconnectCooldown * time.Duration(1<<min(st.attempts, 6)) | ||
| if backoff > maxReconnectCooldown { | ||
| backoff = maxReconnectCooldown | ||
| } | ||
| if time.Now().Before(st.lastAttempt.Add(backoff)) { | ||
| remaining := time.Until(st.lastAttempt.Add(backoff)) | ||
| time.AfterFunc(remaining, func() { | ||
| select { | ||
| case c.reconnectCh <- pid: | ||
| default: | ||
| } | ||
| }) | ||
| continue | ||
| } | ||
| st.lastAttempt = time.Now() | ||
|
|
||
| for _, sp := range c.seedPeers { | ||
| if sp.ID != pid { | ||
| continue | ||
| } | ||
| if c.isConnected(sp.ID) { | ||
| st.attempts = 0 | ||
| break | ||
| } | ||
| st.attempts++ | ||
| c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer") | ||
| go func(info peer.AddrInfo) { | ||
| if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil { | ||
| c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer") | ||
| select { | ||
| case c.reconnectCh <- info.ID: | ||
| default: | ||
| } | ||
| } | ||
| }(sp) | ||
| break | ||
| } | ||
| case <-discoveryTicker.C: | ||
| c.refreshPeerDiscovery(ctx) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT. | ||
| func (c *Client) refreshPeerDiscovery(ctx context.Context) { | ||
| if c.disc == nil { | ||
| return | ||
| } | ||
|
|
||
| c.logger.Debug().Msg("refreshing peer discovery") | ||
|
|
||
| _ = c.advertise(ctx) | ||
|
|
||
|
Comment on lines
+399
to
+400
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In Behavior details:
Citations:
Fix goroutine leak in advertise refresh loop. Calling Start 🤖 Prompt for AI Agents |
||
| peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit)) | ||
| if err != nil { | ||
| c.logger.Warn().Err(err).Msg("peer discovery refresh failed") | ||
| return | ||
| } | ||
|
|
||
| for p := range peerCh { | ||
| if p.ID == c.host.ID() || c.isConnected(p.ID) { | ||
| continue | ||
| } | ||
| select { | ||
| case c.connectSem <- struct{}{}: | ||
| go func(peer peer.AddrInfo) { | ||
| defer func() { <-c.connectSem }() | ||
| c.tryConnect(ctx, peer) | ||
| }(p) | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // isConnected returns true if there is an active connection to the given peer. | ||
| func (c *Client) isConnected(id peer.ID) bool { | ||
| return c.host.Network().Connectedness(id) == network.Connected | ||
| } | ||
|
|
||
| func (c *Client) listen() (host.Host, error) { | ||
| maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) | ||
| if err != nil { | ||
|
|
@@ -256,6 +436,7 @@ | |
|
|
||
| func (c *Client) setupDHT(ctx context.Context) error { | ||
| peers := c.parseAddrInfoList(c.conf.Peers) | ||
| c.seedPeers = peers | ||
| if len(peers) == 0 { | ||
| c.logger.Info().Msg("no peers - only listening for connections") | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.