Skip to content

Commit b8c6c5a

Browse files
committed
handle immediately closed connection
1 parent 9fa221c commit b8c6c5a

9 files changed

Lines changed: 150 additions & 94 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,5 @@ out
256256
.tool-versions
257257
**/*.dat
258258
**/*.mmdb
259+
*.logs
260+
*.log

cmd/nebula/cmd_crawl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ func storeNeighbors[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, dbCr
443443
}
444444
}
445445
log.WithFields(log.Fields{
446-
"duration": time.Since(start),
446+
"duration": time.Since(start).String(),
447447
"avg": fmt.Sprintf("%.2fms", time.Since(start).Seconds()/float64(len(handler.RoutingTables))*1000),
448448
"peers": len(handler.RoutingTables),
449449
"totalNeighbors": neighborsCount,

db/errors.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ var KnownErrors = map[string]string{
3535
"can't assign requested address": models.NetErrorCantAssignRequestedAddress, // transient error
3636
"cannot assign requested address": models.NetErrorCantAssignRequestedAddress, // transient error
3737
"connection gated": models.NetErrorConnectionGated, // transient error
38-
"connection closed immediately": models.NetErrorConnectionClosedImmediately,
3938
}
4039

4140
var ErrorStr = map[string]string{}
@@ -75,7 +74,6 @@ var knownErrorsPrecedence = []string{
7574
"failed to negotiate stream multiplexer",
7675
"resource limit exceeded",
7776
"Write on stream",
78-
"connection closed immediately",
7977
}
8078

8179
// NetError extracts the appropriate error type from the given error.

db/migrations/000026_add_net_errors.up.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ BEGIN;
33
ALTER TYPE net_error ADD VALUE 'connection_reset_by_peer';
44
ALTER TYPE net_error ADD VALUE 'cant_assign_requested_address';
55
ALTER TYPE net_error ADD VALUE 'connection_gated';
6-
ALTER TYPE net_error ADD VALUE 'connection_closed_immediately';
76
ALTER TYPE net_error RENAME VALUE 'no_public_ip' TO 'no_ip_address';
87

98
CREATE OR REPLACE FUNCTION calc_max_failed_visits(

db/models/boil_types.go

Lines changed: 22 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

discv5/crawler.go

Lines changed: 76 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,21 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
6060
libp2pResult := <-libp2pResultCh
6161
discV5Result := <-discV5ResultCh
6262

63+
properties := c.PeerProperties(task.Node)
64+
65+
if libp2pResult.ConnClosedImmediately {
66+
properties["direct_close"] = true
67+
}
68+
69+
if libp2pResult.GenTCPAddr {
70+
properties["gen_tcp_addr"] = true
71+
}
72+
73+
data, err := json.Marshal(properties)
74+
if err != nil {
75+
log.WithError(err).WithField("properties", properties).Warnln("Could not marshal peer properties")
76+
}
77+
6378
cr := core.CrawlResult[PeerInfo]{
6479
CrawlerID: c.id,
6580
Info: task,
@@ -75,7 +90,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
7590
CrawlEndTime: time.Now(),
7691
ConnectStartTime: libp2pResult.ConnectStartTime,
7792
ConnectEndTime: libp2pResult.ConnectEndTime,
78-
Properties: c.PeerProperties(task.Node),
93+
Properties: data,
7994
LogErrors: c.cfg.LogErrors,
8095
}
8196

@@ -85,7 +100,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
85100
return cr, nil
86101
}
87102

88-
func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
103+
func (c *Crawler) PeerProperties(node *enode.Node) map[string]any {
89104
properties := map[string]any{}
90105

91106
properties["seq"] = node.Record().Seq()
@@ -119,23 +134,19 @@ func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
119134
properties["enr"] = node.String()
120135
}
121136

122-
data, err := json.Marshal(properties)
123-
if err != nil {
124-
log.WithError(err).WithField("properties", properties).Warnln("Could not marshal peer properties")
125-
return nil
126-
}
127-
128-
return data
137+
return properties
129138
}
130139

131140
type Libp2pResult struct {
132-
ConnectStartTime time.Time
133-
ConnectEndTime time.Time
134-
ConnectError error
135-
ConnectErrorStr string
136-
Agent string
137-
Protocols []string
138-
ListenAddrs []ma.Multiaddr
141+
ConnectStartTime time.Time
142+
ConnectEndTime time.Time
143+
ConnectError error
144+
ConnectErrorStr string
145+
Agent string
146+
Protocols []string
147+
ListenAddrs []ma.Multiaddr
148+
ConnClosedImmediately bool // whether conn was no error but still unconnected
149+
GenTCPAddr bool // whether a TCP address was generated
139150
}
140151

141152
func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResult {
@@ -144,9 +155,16 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
144155
go func() {
145156
result := Libp2pResult{}
146157

158+
// sanitize the given addresses like removing UDP-only addresses and
159+
// adding corresponding TCP addresses.
160+
sanitizedAddrs, generated := sanitizeAddrs(pi.Addrs())
161+
162+
// keep track if we generated a TCP address to dial
163+
result.GenTCPAddr = generated
164+
147165
addrInfo := peer.AddrInfo{
148166
ID: pi.ID(),
149-
Addrs: pi.Addrs(),
167+
Addrs: sanitizedAddrs,
150168
}
151169

152170
result.ConnectStartTime = time.Now()
@@ -156,6 +174,22 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
156174
// If we could successfully connect to the peer we actually crawl it.
157175
if result.ConnectError == nil {
158176

177+
// check if we're connected
178+
if c.host.Network().Connectedness(pi.ID()) == network.NotConnected {
179+
// this is a weird behavior I was obesrving. Libp2p reports a
180+
// successful connection establishment but isn't connected right
181+
// after the call returned. This point is not a big problem at this
182+
// point because fetchNeighbors will open the connection again. This
183+
// works more often than not but is still weird. At least keep track
184+
// of these cases.
185+
result.ConnClosedImmediately = true
186+
187+
// try it again one more time
188+
if !c.isIdentified(addrInfo.ID) {
189+
_ = c.connect(ctx, addrInfo)
190+
}
191+
}
192+
159193
// wait for the Identify exchange to complete
160194
c.identifyWait(ctx, addrInfo)
161195

@@ -207,11 +241,6 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
207241
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
208242
}
209243

210-
dialAddrInfo := peer.AddrInfo{
211-
ID: pi.ID,
212-
Addrs: sanitizeAddrs(pi.Addrs),
213-
}
214-
215244
var (
216245
retry int = 0
217246
maxRetries int = 2
@@ -221,22 +250,16 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
221250
for {
222251
logEntry := log.WithFields(log.Fields{
223252
"timeout": c.cfg.DialTimeout.String(),
224-
"remoteID": dialAddrInfo.ID.String(),
253+
"remoteID": pi.ID.String(),
225254
"retry": retry,
226-
"maddrs": dialAddrInfo.Addrs,
255+
"maddrs": pi.Addrs,
227256
})
228-
logEntry.Debugln("Connecting to peer", dialAddrInfo.ID.ShortString())
257+
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())
229258

230259
timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
231-
err := c.host.Connect(timeoutCtx, dialAddrInfo)
260+
err := c.host.Connect(timeoutCtx, pi)
232261
cancel()
233262

234-
// if libp2p says we established a connection, but we're not actually
235-
// connected, assign a custom error.
236-
if err == nil && c.host.Network().Connectedness(pi.ID) != network.Connected {
237-
err = fmt.Errorf("connection closed immediately")
238-
}
239-
240263
// if we still don't have an error (despite the above custom error
241264
// handling), we return to the caller.
242265
if err == nil {
@@ -247,29 +270,31 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
247270
// because subsequent connection attempts have a shorter timeout which
248271
// means that it's more likely to run into a context.DeadlineExceeded
249272
// error. If that's the case, we return the original error for tracking
250-
// purposes.
273+
// purposes. If the error is nil, we're not connected and not identified
274+
// firstErr will stay nil. This is fine because we'll track that outside
275+
// of this connect call.
251276
if firstErr == nil {
252277
firstErr = err
253278
}
254279

255280
switch true {
256-
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorNegotiateSecurityProtocol]):
257281
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionRefused]):
258-
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]):
259-
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionClosedImmediately]):
282+
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionGated]):
283+
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorCantAssignRequestedAddress]):
284+
// case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]): often because of mismatching peer ids, so excluding it for now
260285
default:
261286
if errors.Is(err, context.DeadlineExceeded) {
262287
err = firstErr
263288
}
264-
logEntry.WithError(err).Debugln("Failed connecting to peer", dialAddrInfo.ID.ShortString())
289+
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
265290
return err
266291
}
267292

268293
if retry == maxRetries {
269294
if errors.Is(err, context.DeadlineExceeded) {
270295
err = firstErr
271296
}
272-
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", dialAddrInfo.ID.ShortString())
297+
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
273298
return err
274299
}
275300

@@ -291,7 +316,7 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
291316
// there is no other reliable transport address like TCP or QUIC we use the UDP
292317
// IP address + port and craft a TCP address out of it. The UDP address will
293318
// still be removed and replaced with TCP.
294-
func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
319+
func sanitizeAddrs(maddrs []ma.Multiaddr) ([]ma.Multiaddr, bool) {
295320
newMaddrs := make([]ma.Multiaddr, 0, len(maddrs))
296321
for _, maddr := range maddrs {
297322
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
@@ -306,7 +331,7 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
306331
}
307332

308333
if len(newMaddrs) > 0 {
309-
return newMaddrs
334+
return newMaddrs, false
310335
}
311336

312337
for i, maddr := range maddrs {
@@ -338,10 +363,10 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
338363

339364
newMaddrs = append(newMaddrs, tcpMaddr)
340365

341-
return newMaddrs
366+
return newMaddrs, true
342367
}
343368

344-
return maddrs
369+
return maddrs, false
345370
}
346371

347372
// identifyWait waits until any connection to a peer passed the Identify
@@ -350,7 +375,7 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
350375
// identified in the past. We detect a successful identification if an
351376
// AgentVersion is stored in the peer store
352377
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
353-
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // TODO: parameterize
378+
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: parameterize
354379
defer cancel()
355380

356381
var wg sync.WaitGroup
@@ -368,8 +393,7 @@ func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
368393
// check if identification was successful by looking for
369394
// the AgentVersion key. If it exists, we cancel the
370395
// identification of the remaining connections.
371-
agent, err := c.host.Peerstore().Get(pi.ID, "AgentVersion")
372-
if err == nil && agent.(string) != "" {
396+
if c.isIdentified(pi.ID) {
373397
cancel()
374398
return
375399
}
@@ -380,6 +404,13 @@ func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
380404
wg.Wait()
381405
}
382406

407+
// isIdentified returns true if the given peer.ID was successfully identified.
408+
// Just because IdentifyWait returns doesn't mean the peer was identified.
409+
func (c *Crawler) isIdentified(pid peer.ID) bool {
410+
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
411+
return err == nil && agent.(string) != ""
412+
}
413+
383414
type DiscV5Result struct {
384415
// The time we received the first successful response
385416
RespondedAt *time.Time

0 commit comments

Comments
 (0)