Skip to content

Commit 8bbbbca

Browse files
committed
discv5: improve address sanitization
1 parent b50f7bf commit 8bbbbca

2 files changed

Lines changed: 94 additions & 31 deletions

File tree

discv5/crawler.go

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package discv5
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"strings"
89
"sync"
910
"time"
1011

1112
"github.com/ethereum/go-ethereum/p2p/enode"
12-
"github.com/friendsofgo/errors"
1313
"github.com/libp2p/go-libp2p/core/network"
1414
"github.com/libp2p/go-libp2p/core/peer"
1515
"github.com/libp2p/go-libp2p/p2p/host/basic"
@@ -205,38 +205,51 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
205205
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
206206
if len(pi.Addrs) == 0 {
207207
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
208-
} else if len(pi.Addrs) == 1 {
209208
}
210209

211210
dialAddrInfo := peer.AddrInfo{
212211
ID: pi.ID,
213-
Addrs: ensureTCPAddr(pi.Addrs),
212+
Addrs: sanitizeAddrs(pi.Addrs),
214213
}
215214

216-
retry := 0
217-
maxRetries := 2
218-
for {
219-
220-
timeout := time.Duration(c.cfg.DialTimeout.Nanoseconds() / int64(retry+1))
215+
var (
216+
retry int = 0
217+
maxRetries int = 2
218+
firstErr error = nil
219+
)
221220

221+
for {
222222
logEntry := log.WithFields(log.Fields{
223-
"timeout": timeout.String(),
223+
"timeout": c.cfg.DialTimeout.String(),
224224
"remoteID": dialAddrInfo.ID.String(),
225225
"retry": retry,
226226
"maddrs": dialAddrInfo.Addrs,
227227
})
228228
logEntry.Debugln("Connecting to peer", dialAddrInfo.ID.ShortString())
229229

230-
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
230+
timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
231231
err := c.host.Connect(timeoutCtx, dialAddrInfo)
232232
cancel()
233233

234+
// if libp2p says we established a connection, but we're not actually
235+
// connected, assign a custom error.
236+
if err == nil && c.host.Network().Connectedness(pi.ID) != network.Connected {
237+
err = fmt.Errorf("connection closed immediately")
238+
}
239+
240+
// if we still don't have an error (despite the above custom error
241+
// handling), we return to the caller.
234242
if err == nil {
235-
if c.host.Network().Connectedness(pi.ID) != network.Connected {
236-
err = fmt.Errorf("connection closed immediately")
237-
} else {
238-
return nil
239-
}
243+
return nil
244+
}
245+
246+
// at this point we know something went wrong. Track the first error
247+
// because subsequent connection attempts have a shorter timeout which
248+
// means that it's more likely to run into a context.DeadlineExceeded
249+
// error. If that's the case, we return the original error for tracking
250+
// purposes.
251+
if firstErr == nil {
252+
firstErr = err
240253
}
241254

242255
switch true {
@@ -245,38 +258,58 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
245258
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]):
246259
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionClosedImmediately]):
247260
default:
261+
if errors.Is(err, context.DeadlineExceeded) {
262+
err = firstErr
263+
}
248264
logEntry.WithError(err).Debugln("Failed connecting to peer", dialAddrInfo.ID.ShortString())
249265
return err
250266
}
251267

252268
if retry == maxRetries {
269+
if errors.Is(err, context.DeadlineExceeded) {
270+
err = firstErr
271+
}
253272
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", dialAddrInfo.ID.ShortString())
254273
return err
255274
}
256275

276+
sleep := time.Second * time.Duration(3*(retry+1)) // TODO: parameterize
277+
257278
select {
258279
case <-ctx.Done():
259280
return ctx.Err()
260-
case <-time.After(time.Second * time.Duration(3*(retry+1))): // TODO: parameterize
281+
case <-time.After(sleep):
261282
retry += 1
262283
continue
263284
}
264285

265286
}
266287
}
267288

268-
func ensureTCPAddr(maddrs []ma.Multiaddr) []ma.Multiaddr {
289+
// sanitizeAddrs takes the list of multi addresses and removes any UDP-only
290+
// multi address because we cannot dial UDP only addresses anyway. However, if
291+
// there is no other reliable transport address like TCP or QUIC we use the UDP
292+
// IP address + port and craft a TCP address out of it. The UDP address will
293+
// still be removed and replaced with TCP.
294+
func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
295+
newMaddrs := make([]ma.Multiaddr, 0, len(maddrs))
269296
for _, maddr := range maddrs {
270297
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
271-
return maddrs
298+
newMaddrs = append(newMaddrs, maddr)
299+
} else if _, err := maddr.ValueForProtocol(ma.P_UDP); err == nil {
300+
_, quicErr := maddr.ValueForProtocol(ma.P_QUIC)
301+
_, quicV1Err := maddr.ValueForProtocol(ma.P_QUIC_V1)
302+
if quicErr == nil || quicV1Err == nil {
303+
newMaddrs = append(newMaddrs, maddr)
304+
}
272305
}
273306
}
274307

275-
newMaddrs := make([]ma.Multiaddr, 0, len(maddrs)+1)
308+
if len(newMaddrs) > 0 {
309+
return newMaddrs
310+
}
276311

277312
for i, maddr := range maddrs {
278-
newMaddrs = append(newMaddrs, maddr)
279-
280313
udp, err := maddr.ValueForProtocol(ma.P_UDP)
281314
if err != nil {
282315
continue

discv5/crawler_test.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/dennis-tra/nebula-crawler/nebtest"
1010
)
1111

12-
func Test_ensureTCPAddr(t *testing.T) {
12+
func Test_sanitizeAddrs(t *testing.T) {
1313
tests := []struct {
1414
name string
1515
maddrs []ma.Multiaddr
@@ -23,22 +23,53 @@ func Test_ensureTCPAddr(t *testing.T) {
2323
{
2424
name: "tcp exists",
2525
maddrs: []ma.Multiaddr{
26-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
27-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
26+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234"),
27+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
2828
},
2929
want: []ma.Multiaddr{
30-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
31-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
30+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
31+
},
32+
},
33+
{
34+
name: "quic exist",
35+
maddrs: []ma.Multiaddr{
36+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234/quic"),
37+
},
38+
want: []ma.Multiaddr{
39+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234/quic"),
40+
},
41+
},
42+
{
43+
name: "tcp and quic exist",
44+
maddrs: []ma.Multiaddr{
45+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234"),
46+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
47+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234/quic"),
48+
},
49+
want: []ma.Multiaddr{
50+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
51+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234/quic"),
52+
},
53+
},
54+
{
55+
name: "tcp and quicv1 exist",
56+
maddrs: []ma.Multiaddr{
57+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234"),
58+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
59+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234/quic-v1"),
60+
},
61+
want: []ma.Multiaddr{
62+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
63+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234/quic-v1"),
3264
},
3365
},
3466
{
3567
name: "single udp ip4",
3668
maddrs: []ma.Multiaddr{
37-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
69+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/udp/1234"),
3870
},
3971
want: []ma.Multiaddr{
40-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
41-
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
72+
nebtest.MustMultiaddr(t, "/ip4/123.4.5.6/tcp/1234"),
4273
},
4374
},
4475
{
@@ -47,14 +78,13 @@ func Test_ensureTCPAddr(t *testing.T) {
4778
nebtest.MustMultiaddr(t, "/ip6/::1/udp/1234"),
4879
},
4980
want: []ma.Multiaddr{
50-
nebtest.MustMultiaddr(t, "/ip6/::1/udp/1234"),
5181
nebtest.MustMultiaddr(t, "/ip6/::1/tcp/1234"),
5282
},
5383
},
5484
}
5585
for _, tt := range tests {
5686
t.Run(tt.name, func(t *testing.T) {
57-
assert.Equalf(t, tt.want, ensureTCPAddr(tt.maddrs), "ensureTCPAddr(%v)", tt.maddrs)
87+
assert.Equalf(t, tt.want, sanitizeAddrs(tt.maddrs), "sanitizeAddrs(%v)", tt.maddrs)
5888
})
5989
}
6090
}

0 commit comments

Comments
 (0)