Skip to content

Commit 08a72fa

Browse files
committed
improve ethereum crawling
1 parent d84e97f commit 08a72fa

6 files changed

Lines changed: 286 additions & 35 deletions

File tree

pkg/discv5/crawler.go

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@ package discv5
22

33
import (
44
"context"
5-
"encoding/binary"
6-
"encoding/hex"
75
"encoding/json"
86
"fmt"
9-
"math/bits"
107
"strings"
118
"sync"
129
"time"
@@ -93,24 +90,26 @@ func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
9390

9491
var enrEntryEth2 ENREntryEth2
9592
if err := node.Load(&enrEntryEth2); err == nil {
96-
if beaconData, err := enrEntryEth2.Data(); err == nil {
97-
properties["fork_digest"] = beaconData.ForkDigest.String()
98-
properties["next_fork_version"] = beaconData.NextForkVersion.String()
99-
properties["next_fork_epoch"] = beaconData.NextForkEpoch.String()
100-
}
93+
properties["fork_digest"] = enrEntryEth2.ForkDigest.String()
94+
properties["next_fork_version"] = enrEntryEth2.NextForkVersion.String()
95+
properties["next_fork_epoch"] = enrEntryEth2.NextForkEpoch.String()
10196
}
10297

10398
var enrEntryAttnets ENREntryAttnets
10499
if err := node.Load(&enrEntryAttnets); err == nil {
105-
rawInt := binary.BigEndian.Uint64(enrEntryAttnets)
106-
properties["attnets_num"] = bits.OnesCount64(rawInt)
107-
properties["attnets"] = hex.EncodeToString(enrEntryAttnets)
100+
properties["attnets_num"] = enrEntryAttnets.AttnetsNum
101+
properties["attnets"] = enrEntryAttnets.Attnest
108102
}
109103

110104
var enrEntrySyncCommsSubnet ENREntrySyncCommsSubnet
111105
if err := node.Load(&enrEntrySyncCommsSubnet); err == nil {
112-
// check out https://github.com/prysmaticlabs/prysm/blob/203dc5f63b060821c2706f03a17d66b3813c860c/beacon-chain/p2p/subnets.go#L221
113-
properties["syncnets"] = hex.EncodeToString(enrEntrySyncCommsSubnet)
106+
properties["syncnets"] = enrEntrySyncCommsSubnet.SyncNets
107+
}
108+
109+
var enrEntryOpStack ENREntryOpStack
110+
if err := node.Load(&enrEntryOpStack); err == nil {
111+
properties["opstack_chain_id"] = enrEntryOpStack.ChainID
112+
properties["opstack_version"] = enrEntryOpStack.Version
114113
}
115114

116115
data, err := json.Marshal(properties)
@@ -202,17 +201,31 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
202201
if len(pi.Addrs) == 0 {
203202
metrics.VisitErrorsCount.With(metrics.CrawlLabel).Inc()
204203
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
204+
} else if len(pi.Addrs) == 1 {
205+
}
206+
207+
dialAddrInfo := peer.AddrInfo{
208+
ID: pi.ID,
209+
Addrs: ensureTCPAddr(pi.Addrs),
210+
}
211+
212+
replaced := false
213+
if len(dialAddrInfo.Addrs) != len(pi.Addrs) {
214+
replaced = true
205215
}
206216

207217
retry := 0
208218
maxRetries := 1
209219
for {
210220
timeout := time.Duration(c.cfg.DialTimeout.Nanoseconds() / int64(retry+1))
211221
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
212-
err := c.host.Connect(timeoutCtx, pi)
222+
err := c.host.Connect(timeoutCtx, dialAddrInfo)
213223
cancel()
214224

215225
if err == nil {
226+
if replaced {
227+
log.WithField("remoteID", pi.ID.ShortString()).Errorln("hat was gebracht!")
228+
}
216229
return nil
217230
}
218231

@@ -241,6 +254,52 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
241254
}
242255
}
243256

257+
func ensureTCPAddr(maddrs []ma.Multiaddr) []ma.Multiaddr {
258+
for _, maddr := range maddrs {
259+
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
260+
return maddrs
261+
}
262+
}
263+
264+
newMaddrs := make([]ma.Multiaddr, 0, len(maddrs)+1)
265+
266+
for i, maddr := range maddrs {
267+
newMaddrs = append(newMaddrs, maddr)
268+
269+
udp, err := maddr.ValueForProtocol(ma.P_UDP)
270+
if err != nil {
271+
continue
272+
}
273+
274+
ip := ""
275+
ip4, err := maddr.ValueForProtocol(ma.P_IP4)
276+
if err != nil {
277+
ip6, err := maddr.ValueForProtocol(ma.P_IP6)
278+
if err != nil {
279+
continue
280+
}
281+
ip = "/ip6/" + ip6
282+
} else {
283+
ip = "/ip4/" + ip4
284+
}
285+
286+
tcpMaddr, err := ma.NewMultiaddr(ip + "/tcp/" + udp)
287+
if err != nil {
288+
continue
289+
}
290+
291+
for _, remaining := range maddrs[i+1:] {
292+
newMaddrs = append(newMaddrs, remaining)
293+
}
294+
295+
newMaddrs = append(newMaddrs, tcpMaddr)
296+
297+
return newMaddrs
298+
}
299+
300+
return maddrs
301+
}
302+
244303
// identifyWait waits until any connection to a peer passed the Identify
245304
// exchange successfully or all identification attempts have failed.
246305
// The call to IdentifyWait returns immediately if the connection was

pkg/discv5/crawler_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package discv5
2+
3+
import (
4+
"testing"
5+
6+
ma "github.com/multiformats/go-multiaddr"
7+
"github.com/stretchr/testify/assert"
8+
9+
"github.com/dennis-tra/nebula-crawler/pkg/nebtest"
10+
)
11+
12+
func Test_ensureTCPAddr(t *testing.T) {
13+
tests := []struct {
14+
name string
15+
maddrs []ma.Multiaddr
16+
want []ma.Multiaddr
17+
}{
18+
{
19+
name: "empty",
20+
maddrs: []ma.Multiaddr{},
21+
want: []ma.Multiaddr{},
22+
},
23+
{
24+
name: "tcp exists",
25+
maddrs: []ma.Multiaddr{
26+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
27+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
28+
},
29+
want: []ma.Multiaddr{
30+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
31+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
32+
},
33+
},
34+
{
35+
name: "single udp ip4",
36+
maddrs: []ma.Multiaddr{
37+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
38+
},
39+
want: []ma.Multiaddr{
40+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
41+
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
42+
},
43+
},
44+
{
45+
name: "single udp ip6",
46+
maddrs: []ma.Multiaddr{
47+
nebtest.MustMultiaddr(t, "/ip6/::1/udp/1234"),
48+
},
49+
want: []ma.Multiaddr{
50+
nebtest.MustMultiaddr(t, "/ip6/::1/udp/1234"),
51+
nebtest.MustMultiaddr(t, "/ip6/::1/tcp/1234"),
52+
},
53+
},
54+
}
55+
for _, tt := range tests {
56+
t.Run(tt.name, func(t *testing.T) {
57+
assert.Equalf(t, tt.want, ensureTCPAddr(tt.maddrs), "ensureTCPAddr(%v)", tt.maddrs)
58+
})
59+
}
60+
}

pkg/discv5/dialer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type Dialer struct {
2020

2121
var _ core.Worker[PeerInfo, core.DialResult[PeerInfo]] = (*Dialer)(nil)
2222

23-
// Work TODO
23+
// Work takes the PeerInfo object and tries to figure out if the peer is
24+
// still online.
2425
func (d *Dialer) Work(ctx context.Context, task PeerInfo) (core.DialResult[PeerInfo], error) {
2526
// Creating log entry
2627
logEntry := log.WithFields(log.Fields{

pkg/discv5/driver_crawler.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/elliptic"
66
crand "crypto/rand"
77
"fmt"
8+
"io"
89
"net"
910
"time"
1011

@@ -15,6 +16,7 @@ import (
1516
"github.com/libp2p/go-libp2p"
1617
"github.com/libp2p/go-libp2p/core/crypto"
1718
"github.com/libp2p/go-libp2p/core/host"
19+
"github.com/libp2p/go-libp2p/core/network"
1820
"github.com/libp2p/go-libp2p/core/peer"
1921
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
2022
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
@@ -157,6 +159,7 @@ func NewCrawlDriver(dbc db.Client, crawl *models.Crawl, cfg *CrawlDriverConfig)
157159

158160
privBytes := elliptic.Marshal(ethcrypto.S256(), ecdsaKey.X, ecdsaKey.Y)
159161
secpKey := (*crypto.Secp256k1PrivateKey)(secp256k1v4.PrivKeyFromBytes(privBytes))
162+
160163
// Initialize a single libp2p node that's shared between all crawlers.
161164
// Context: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#network-fundamentals
162165
h, err := libp2p.New(
@@ -173,6 +176,19 @@ func NewCrawlDriver(dbc db.Client, crawl *models.Crawl, cfg *CrawlDriverConfig)
173176
return nil, fmt.Errorf("new libp2p host: %w", err)
174177
}
175178

179+
// According to Diva, these are required protocols. Some of them are just
180+
// assumed to be required. We just read from the stream indefinitely to
181+
// gain time for the identify exchange to finish. We just pretend to support
182+
// these protocols and keep the stream busy until we have gathered all the
183+
// information we were interested in. This includes the agend version and
184+
// all supported protocols.
185+
h.SetStreamHandler("/eth2/beacon_chain/req/ping/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
186+
h.SetStreamHandler("/eth2/beacon_chain/req/status/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
187+
h.SetStreamHandler("/eth2/beacon_chain/req/metadata/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
188+
h.SetStreamHandler("/eth2/beacon_chain/req/metadata/2/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
189+
h.SetStreamHandler("/eth2/beacon_chain/req/goodbye/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
190+
h.SetStreamHandler("/meshsub/1.1.0", func(s network.Stream) { io.ReadAll(s) }) // https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
191+
176192
log.WithField("peerID", h.ID().String()).Infoln("Started libp2p host")
177193

178194
nodesMap := map[enode.ID]*enode.Node{}

pkg/discv5/driver_crawler_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package discv5
2+
3+
import (
4+
"testing"
5+
6+
"github.com/ethereum/go-ethereum/p2p/enode"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestNewPeerInfo(t *testing.T) {
12+
type args struct {
13+
node *enode.Node
14+
}
15+
tests := []struct {
16+
name string
17+
enr string
18+
peerID string
19+
maddrs []string
20+
}{
21+
{
22+
name: "bootstrap 1",
23+
enr: "enr:-Ku4QEWzdnVtXc2Q0ZVigfCGggOVB2Vc1ZCPEc6j21NIFLODSJbvNaef1g4PxhPwl_3kax86YPheFUSLXPRs98vvYsoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDZBrP2Jc2VjcDI1NmsxoQM6jr8Rb1ktLEsVcKAPa08wCsKUmvoQ8khiOl_SLozf9IN1ZHCCIyg",
24+
peerID: "16Uiu2HAmGbaF2hpC4iumV7eWiK51nWmsZf7mK2R9nCM6rrvRQCwu",
25+
maddrs: []string{
26+
"/ip4/54.65.172.253/udp/9000",
27+
},
28+
},
29+
{
30+
name: "bootstrap 2",
31+
enr: "enr:-LK4QKWrXTpV9T78hNG6s8AM6IO4XH9kFT91uZtFg1GcsJ6dKovDOr1jtAAFPnS2lvNltkOGA9k29BUN7lFh_sjuc9QBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhANAdd-Jc2VjcDI1NmsxoQLQa6ai7y9PMN5hpLe5HmiJSlYzMuzP7ZhwRiwHvqNXdoN0Y3CCI4yDdWRwgiOM",
32+
peerID: "16Uiu2HAm9TFzGRAgJKVyfZ54nDbCYNwEAFnpz6ybjM7KaVPLJZpy",
33+
maddrs: []string{
34+
"/ip4/3.64.117.223/udp/9100",
35+
"/ip4/3.64.117.223/tcp/9100",
36+
},
37+
},
38+
}
39+
for _, tt := range tests {
40+
t.Run(tt.name, func(t *testing.T) {
41+
node, err := enode.Parse(enode.ValidSchemes, tt.enr)
42+
require.NoError(t, err)
43+
44+
got, err := NewPeerInfo(node)
45+
assert.Nil(t, err)
46+
assert.Equal(t, tt.peerID, got.peerID.String())
47+
48+
for i, maddr := range got.maddrs {
49+
assert.Equal(t, tt.maddrs[i], maddr.String())
50+
}
51+
})
52+
}
53+
}

0 commit comments

Comments
 (0)