Skip to content

Commit e7a845b

Browse files
committed
add discv4 ethereum execution layer support
1 parent 8b6aa61 commit e7a845b

11 files changed

Lines changed: 906 additions & 12 deletions

File tree

cmd/nebula/cmd_crawl.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/dennis-tra/nebula-crawler/core"
2222
"github.com/dennis-tra/nebula-crawler/db"
2323
"github.com/dennis-tra/nebula-crawler/db/models"
24+
"github.com/dennis-tra/nebula-crawler/discv4"
2425
"github.com/dennis-tra/nebula-crawler/discv5"
2526
"github.com/dennis-tra/nebula-crawler/libp2p"
2627
)
@@ -233,6 +234,47 @@ func CrawlAction(c *cli.Context) error {
233234
}
234235

235236
switch cfg.Network {
237+
case string(config.NetworkEthExec):
238+
// configure the crawl driver
239+
driverCfg := &discv4.CrawlDriverConfig{
240+
Version: cfg.Root.Version(),
241+
DialTimeout: cfg.Root.DialTimeout,
242+
TrackNeighbors: cfg.PersistNeighbors,
243+
BootstrapPeerStrs: cfg.BootstrapPeers.Value(),
244+
AddrDialType: cfg.AddrDialType(),
245+
AddrTrackType: cfg.AddrTrackType(),
246+
TracerProvider: cfg.Root.TracerProvider,
247+
MeterProvider: cfg.Root.MeterProvider,
248+
LogErrors: cfg.Root.LogErrors,
249+
}
250+
251+
// init the crawl driver
252+
driver, err := discv4.NewCrawlDriver(dbc, dbCrawl, driverCfg)
253+
if err != nil {
254+
return fmt.Errorf("new discv4 driver: %w", err)
255+
}
256+
257+
// init the result handler
258+
handler := core.NewCrawlHandler[discv4.PeerInfo](handlerCfg)
259+
260+
// put everything together and init the engine that'll run the crawl
261+
eng, err := core.NewEngine[discv4.PeerInfo, core.CrawlResult[discv4.PeerInfo]](driver, handler, engineCfg)
262+
if err != nil {
263+
return fmt.Errorf("new engine: %w", err)
264+
}
265+
266+
// finally, start the crawl
267+
queuedPeers, runErr := eng.Run(ctx)
268+
269+
// a bit ugly but, but the handler will contain crawl statistics, that
270+
// we'll save to the database and print to the screen
271+
handler.QueuedPeers = len(queuedPeers)
272+
if err := persistCrawlInformation(dbc, dbCrawl, handler, runErr); err != nil {
273+
return fmt.Errorf("persist crawl information: %w", err)
274+
}
275+
276+
return nil
277+
236278
case string(config.NetworkEthCons),
237279
string(config.NetworkHolesky): // use a different driver etc. for the Ethereum consensus layer + Holeksy Testnet
238280
// configure the crawl driver
@@ -252,7 +294,7 @@ func CrawlAction(c *cli.Context) error {
252294
// init the crawl driver
253295
driver, err := discv5.NewCrawlDriver(dbc, dbCrawl, driverCfg)
254296
if err != nil {
255-
return fmt.Errorf("new driver: %w", err)
297+
return fmt.Errorf("new discv5 driver: %w", err)
256298
}
257299

258300
// init the result handler

config/bootstrap.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,15 @@ var (
201201
"enr:-LK4QKWrXTpV9T78hNG6s8AM6IO4XH9kFT91uZtFg1GcsJ6dKovDOr1jtAAFPnS2lvNltkOGA9k29BUN7lFh_sjuc9QBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhANAdd-Jc2VjcDI1NmsxoQLQa6ai7y9PMN5hpLe5HmiJSlYzMuzP7ZhwRiwHvqNXdoN0Y3CCI4yDdWRwgiOM",
202202
}
203203

204+
// BootstrapPeersEthereumExecution extracted from:
205+
// https://github.com/ethereum/go-ethereum/blob/f04e5bde7487ce554930187e766164b18c37d867/params/bootnodes.go#L23
206+
BootstrapPeersEthereumExecution = []string{
207+
"enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", // bootnode-aws-ap-southeast-1-001
208+
//"enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", // bootnode-aws-us-east-1-001
209+
//"enode://2b252ab6a1d0f971d9722cb839a42cb81db019ba44c08754628ab4a823487071b5695317c8ccd085219c3a03af063495b2f1da8d18218da2d6a82981b45e6ffc@65.108.70.101:30303", // bootnode-hetzner-hel
210+
//"enode://4aeb4ab6c14b23e2c4cfdce879c04b0748a20d8e9b59e25ded2a08143e265c6c25936e74cbc8e641e3312ca288673d91f2f93f8e277de3cfa444ecdaaf982052@157.90.35.166:30303", // bootnode-hetzner-fsn
211+
}
212+
204213
// BootstrapPeersHolesky extracted from:
205214
// https://github.com/eth-clients/holesky/blob/main/custom_config_data/bootstrap_nodes.txt
206215
BootstrapPeersHolesky = []string{

config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
NetworkMocha Network = "MOCHA"
2929
NetworkBlockRa Network = "BLOCKSPACE_RACE"
3030
NetworkEthCons Network = "ETHEREUM_CONSENSUS"
31+
NetworkEthExec Network = "ETHEREUM_EXECUTION"
3132
NetworkHolesky Network = "HOLESKY"
3233
)
3334

@@ -45,6 +46,7 @@ func Networks() []Network {
4546
NetworkMocha,
4647
NetworkBlockRa,
4748
NetworkEthCons,
49+
NetworkEthExec,
4850
NetworkHolesky,
4951
}
5052
}
@@ -285,6 +287,9 @@ func ConfigureNetwork(network string) (*cli.StringSlice, *cli.StringSlice, error
285287
case NetworkEthCons:
286288
bootstrapPeers = cli.NewStringSlice(BootstrapPeersEthereumConsensus...)
287289
protocols = cli.NewStringSlice("discv5") // TODO
290+
case NetworkEthExec:
291+
bootstrapPeers = cli.NewStringSlice(BootstrapPeersEthereumExecution...)
292+
protocols = cli.NewStringSlice("discv4") // TODO
288293
case NetworkHolesky:
289294
bootstrapPeers = cli.NewStringSlice(BootstrapPeersHolesky...)
290295
protocols = cli.NewStringSlice("discv5") // TODO

discv4/crawler.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package discv4
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"sync"
9+
"time"
10+
11+
"github.com/ethereum/go-ethereum/p2p/enode"
12+
log "github.com/sirupsen/logrus"
13+
"go.uber.org/atomic"
14+
"golang.org/x/sync/errgroup"
15+
16+
"github.com/dennis-tra/nebula-crawler/config"
17+
"github.com/dennis-tra/nebula-crawler/core"
18+
"github.com/dennis-tra/nebula-crawler/db"
19+
"github.com/dennis-tra/nebula-crawler/discvx"
20+
)
21+
22+
type CrawlerConfig struct {
23+
DialTimeout time.Duration
24+
AddrDialType config.AddrType
25+
LogErrors bool
26+
}
27+
28+
type Crawler struct {
29+
id string
30+
cfg *CrawlerConfig
31+
listener *discvx.UDPv4
32+
crawledPeers int
33+
done chan struct{}
34+
}
35+
36+
var _ core.Worker[PeerInfo, core.CrawlResult[PeerInfo]] = (*Crawler)(nil)
37+
38+
func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[PeerInfo], error) {
39+
logEntry := log.WithFields(log.Fields{
40+
"crawlerID": c.id,
41+
"remoteID": task.peerID.ShortString(),
42+
"crawlCount": c.crawledPeers,
43+
})
44+
logEntry.Debugln("Crawling peer")
45+
defer logEntry.Debugln("Crawled peer")
46+
47+
crawlStart := time.Now()
48+
49+
result := c.crawlDiscV4(ctx, task)
50+
51+
cr := core.CrawlResult[PeerInfo]{
52+
CrawlerID: c.id,
53+
Info: task,
54+
CrawlStartTime: crawlStart,
55+
RoutingTableFromAPI: false,
56+
RoutingTable: result.RoutingTable,
57+
// Agent: libp2pResult.Agent,
58+
// Protocols: libp2pResult.Protocols,
59+
// ConnectError: libp2pResult.ConnectError,
60+
// ConnectErrorStr: libp2pResult.ConnectErrorStr,
61+
CrawlError: result.Error,
62+
CrawlErrorStr: result.ErrorStr,
63+
CrawlEndTime: time.Now(),
64+
// ConnectStartTime: libp2pResult.ConnectStartTime,
65+
// ConnectEndTime: libp2pResult.ConnectEndTime,
66+
// Properties: data,
67+
LogErrors: c.cfg.LogErrors,
68+
}
69+
70+
// We've now crawled this peer, so increment
71+
c.crawledPeers++
72+
73+
return cr, nil
74+
}
75+
76+
type DiscV4Result struct {
77+
// The time we received the first successful response
78+
RespondedAt *time.Time
79+
80+
// The updated ethereum node record
81+
ENR *enode.Node
82+
83+
// The neighbors of the crawled peer
84+
RoutingTable *core.RoutingTable[PeerInfo]
85+
86+
// The time the draining of bucket entries was finished
87+
DoneAt time.Time
88+
89+
// The combined error of crawling the peer's buckets
90+
Error error
91+
92+
// The above error mapped to a known string
93+
ErrorStr string
94+
}
95+
96+
func (c *Crawler) crawlDiscV4(ctx context.Context, pi PeerInfo) DiscV4Result {
97+
// mutex to guard access to result and allNeighbors
98+
mu := sync.RWMutex{}
99+
100+
// the final result struct
101+
result := DiscV4Result{}
102+
103+
// all neighbors of pi. We're using a map to deduplicate.
104+
allNeighbors := map[string]PeerInfo{}
105+
106+
// errorBits tracks at which CPL errors have occurred.
107+
// 0000 0000 0000 0000 - No error
108+
// 0000 0000 0000 0001 - An error has occurred at CPL 0
109+
// 1000 0000 0000 0001 - An error has occurred at CPL 0 and 15
110+
errorBits := atomic.NewUint32(0)
111+
112+
enr, err := c.listener.RequestENR(pi.Node)
113+
if err != nil {
114+
result.ENR = pi.Node
115+
} else {
116+
result.ENR = enr
117+
now := time.Now()
118+
result.RespondedAt = &now
119+
}
120+
121+
errg := errgroup.Group{}
122+
for i := 0; i <= 15; i++ { // 15 is maximum
123+
count := i // Copy value
124+
errg.Go(func() error {
125+
pubKey, err := discvx.GenRandomPublicKey(pi.Node.ID(), count)
126+
if err != nil {
127+
log.WithError(err).WithField("enr", pi.Node.String()).Warnln("Failed generating public key")
128+
errorBits.Add(1 << count)
129+
return fmt.Errorf("generating random public key with CPL %d: %w", count, err)
130+
}
131+
132+
udpAddr := &net.UDPAddr{IP: pi.Node.IP(), Port: pi.Node.UDP()}
133+
134+
var neighbors []*enode.Node
135+
for retry := 0; retry < 2; retry++ {
136+
neighbors, err = c.listener.FindNode(pi.Node.ID(), udpAddr, pubKey)
137+
if err == nil {
138+
break
139+
}
140+
141+
errorBits.Add(1 << count)
142+
143+
if errors.Is(err, discvx.ErrTimeout) {
144+
sleepDur := time.Second * time.Duration(5*(retry+1))
145+
select {
146+
case <-ctx.Done():
147+
return ctx.Err()
148+
case <-time.After(sleepDur): // may add jitter here
149+
continue
150+
}
151+
}
152+
153+
errorBits.Add(1 << count)
154+
155+
return fmt.Errorf("getting closest peer with CPL %d: %w", count, err)
156+
}
157+
158+
mu.Lock()
159+
defer mu.Unlock()
160+
161+
if result.RespondedAt == nil {
162+
now := time.Now()
163+
result.RespondedAt = &now
164+
}
165+
166+
for _, n := range neighbors {
167+
npi, err := NewPeerInfo(n)
168+
if err != nil {
169+
log.WithError(err).Warnln("Failed parsing ethereum node neighbor")
170+
continue
171+
}
172+
allNeighbors[string(npi.peerID)] = npi
173+
}
174+
175+
if err != nil {
176+
errorBits.Add(1 << count)
177+
return err
178+
}
179+
180+
return nil
181+
})
182+
}
183+
184+
// wait for go routines to finish
185+
err = errg.Wait()
186+
187+
// track done timestamp and error
188+
result.DoneAt = time.Now()
189+
result.Error = err
190+
191+
result.RoutingTable = &core.RoutingTable[PeerInfo]{
192+
PeerID: pi.ID(),
193+
Neighbors: []PeerInfo{},
194+
ErrorBits: uint16(errorBits.Load()),
195+
Error: err,
196+
}
197+
198+
for _, n := range allNeighbors {
199+
result.RoutingTable.Neighbors = append(result.RoutingTable.Neighbors, n)
200+
}
201+
202+
// if there was a connection error, parse it to a known one
203+
if result.Error != nil {
204+
result.ErrorStr = db.NetError(result.Error)
205+
}
206+
207+
return result
208+
}

discv4/dialer.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package discv4
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
log "github.com/sirupsen/logrus"
8+
9+
"github.com/dennis-tra/nebula-crawler/core"
10+
"github.com/dennis-tra/nebula-crawler/db"
11+
"github.com/dennis-tra/nebula-crawler/discvx"
12+
)
13+
14+
// Dialer encapsulates a libp2p host that dials peers.
15+
type Dialer struct {
16+
id string
17+
dialedPeers uint64
18+
listener *discvx.UDPv4
19+
}
20+
21+
var _ core.Worker[PeerInfo, core.DialResult[PeerInfo]] = (*Dialer)(nil)
22+
23+
// Work takes the PeerInfo object and tries to figure out if the peer is
24+
// still online.
25+
func (d *Dialer) Work(ctx context.Context, task PeerInfo) (core.DialResult[PeerInfo], error) {
26+
// Creating log entry
27+
logEntry := log.WithFields(log.Fields{
28+
"dialerID": d.id,
29+
"remoteID": task.ID().ShortString(),
30+
"dialCount": d.dialedPeers,
31+
})
32+
logEntry.Debugln("Dialing peer")
33+
defer logEntry.Debugln("Dialed peer")
34+
35+
// Initialize dial result
36+
dr := core.DialResult[PeerInfo]{
37+
DialerID: d.id,
38+
Info: task,
39+
DialStartTime: time.Now(),
40+
}
41+
42+
newEnr, err := d.listener.RequestENR(task.Node)
43+
dr.DialEndTime = time.Now()
44+
45+
if err != nil {
46+
dr.Error = err
47+
dr.DialError = db.NetError(dr.Error)
48+
} else {
49+
task.Node = newEnr
50+
dr.Info = task
51+
}
52+
53+
d.dialedPeers += 1
54+
55+
return dr, nil
56+
}

0 commit comments

Comments
 (0)