Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 90 additions & 31 deletions cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"os"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/dennis-tra/nebula-crawler/discv4"
"github.com/dennis-tra/nebula-crawler/discv5"
"github.com/dennis-tra/nebula-crawler/libp2p"
"github.com/dennis-tra/nebula-crawler/utils"
)

var crawlConfig = &config.Crawl{
Expand Down Expand Up @@ -200,11 +202,18 @@ func CrawlAction(c *cli.Context) error {
return fmt.Errorf("new database client: %w", err)
}
defer func() {
if err := dbc.Close(); err != nil {
if err := dbc.Close(); err != nil && !errors.Is(err, sql.ErrConnDone) {
log.WithError(err).Warnln("Failed closing database handle")
}
}()

// Query some additional bootstrap peers from the database.
// This is optional, and we only log a warning if that doesn't work.
bpAddrInfos, err := dbc.QueryBootstrapPeers(ctx, 10)
if err != nil {
log.WithError(err).Warnln("Failed querying bootstrap peers")
}

// Inserting a crawl row into the db so that we
// can associate results with this crawl via
// its DB identifier
Expand Down Expand Up @@ -239,17 +248,37 @@ func CrawlAction(c *cli.Context) error {

switch cfg.Network {
case string(config.NetworkEthExec):

bpEnodes, err := cfg.BootstrapEnodesV4()
if err != nil {
return err
}

for _, addrInfo := range bpAddrInfos {
n, err := utils.ToEnode(addrInfo.ID, addrInfo.Addrs)
if err != nil {
// this is just a best-effort operation so only
// log the error and continue
log.WithError(err).WithFields(log.Fields{
"pid": addrInfo.ID,
"maddrs": addrInfo.Addrs,
}).Warnln("Failed transforming AddrInfo to *enode.Node")
continue
}
bpEnodes = append(bpEnodes, n)
}

// configure the crawl driver
driverCfg := &discv4.CrawlDriverConfig{
Version: cfg.Root.Version(),
DialTimeout: cfg.Root.DialTimeout,
TrackNeighbors: cfg.PersistNeighbors,
BootstrapPeerStrs: cfg.BootstrapPeers.Value(),
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
TracerProvider: cfg.Root.TracerProvider,
MeterProvider: cfg.Root.MeterProvider,
LogErrors: cfg.Root.LogErrors,
Version: cfg.Root.Version(),
DialTimeout: cfg.Root.DialTimeout,
TrackNeighbors: cfg.PersistNeighbors,
BootstrapPeers: bpEnodes,
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
TracerProvider: cfg.Root.TracerProvider,
MeterProvider: cfg.Root.MeterProvider,
LogErrors: cfg.Root.LogErrors,
}

// init the crawl driver
Expand Down Expand Up @@ -281,18 +310,38 @@ func CrawlAction(c *cli.Context) error {

case string(config.NetworkEthCons),
string(config.NetworkHolesky): // use a different driver etc. for the Ethereum consensus layer + Holeksy Testnet

bpEnodes, err := cfg.BootstrapEnodesV5()
if err != nil {
return err
}

for _, addrInfo := range bpAddrInfos {
n, err := utils.ToEnode(addrInfo.ID, addrInfo.Addrs)
if err != nil {
// this is just a best-effort operation so only
// log the error and continue
log.WithError(err).WithFields(log.Fields{
"pid": addrInfo.ID,
"maddrs": addrInfo.Addrs,
}).Warnln("Failed transforming AddrInfo to *enode.Node")
continue
}
bpEnodes = append(bpEnodes, n)
}

// configure the crawl driver
driverCfg := &discv5.CrawlDriverConfig{
Version: cfg.Root.Version(),
DialTimeout: cfg.Root.DialTimeout,
TrackNeighbors: cfg.PersistNeighbors,
BootstrapPeerStrs: cfg.BootstrapPeers.Value(),
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
KeepENR: crawlConfig.KeepENR,
TracerProvider: cfg.Root.TracerProvider,
MeterProvider: cfg.Root.MeterProvider,
LogErrors: cfg.Root.LogErrors,
Version: cfg.Root.Version(),
DialTimeout: cfg.Root.DialTimeout,
TrackNeighbors: cfg.PersistNeighbors,
BootstrapPeers: bpEnodes,
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
KeepENR: crawlConfig.KeepENR,
TracerProvider: cfg.Root.TracerProvider,
MeterProvider: cfg.Root.MeterProvider,
LogErrors: cfg.Root.LogErrors,
}

// init the crawl driver
Expand Down Expand Up @@ -322,19 +371,29 @@ func CrawlAction(c *cli.Context) error {

return nil
default:

addrInfos, err := cfg.BootstrapAddrInfos()
if err != nil {
return err
}

for _, addrInfo := range addrInfos {
bpAddrInfos = append(bpAddrInfos, addrInfo)
}

// configure the crawl driver
driverCfg := &libp2p.CrawlDriverConfig{
Version: cfg.Root.Version(),
Protocols: cfg.Protocols.Value(),
DialTimeout: cfg.Root.DialTimeout,
TrackNeighbors: cfg.PersistNeighbors,
CheckExposed: cfg.CheckExposed,
BootstrapPeerStrs: cfg.BootstrapPeers.Value(),
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
TracerProvider: cfg.Root.TracerProvider,
MeterProvider: cfg.Root.MeterProvider,
LogErrors: cfg.Root.LogErrors,
Version: cfg.Root.Version(),
Protocols: cfg.Protocols.Value(),
DialTimeout: cfg.Root.DialTimeout,
TrackNeighbors: cfg.PersistNeighbors,
CheckExposed: cfg.CheckExposed,
BootstrapPeers: bpAddrInfos,
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
TracerProvider: cfg.Root.TracerProvider,
MeterProvider: cfg.Root.MeterProvider,
LogErrors: cfg.Root.LogErrors,
}

// init the crawl driver
Expand Down
72 changes: 72 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"strconv"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -244,6 +247,75 @@ func (c *Crawl) AddrDialType() AddrType {
return AddrType(c.AddrDialTypeStr)
}

func (c *Crawl) BootstrapAddrInfos() ([]peer.AddrInfo, error) {
addrInfoMap := map[peer.ID][]ma.Multiaddr{}
for _, maddrStr := range c.BootstrapPeers.Value() {

maddr, err := ma.NewMultiaddr(maddrStr)
if err != nil {
return nil, fmt.Errorf("parse multiaddress %s: %w", maddrStr, err)
}

pi, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
return nil, fmt.Errorf("parse addr info from maddr %s: %w", maddr, err)
}

_, found := addrInfoMap[pi.ID]
if found {
addrInfoMap[pi.ID] = append(addrInfoMap[pi.ID], pi.Addrs...)
} else {
addrInfoMap[pi.ID] = pi.Addrs
}
}

addrInfos := make([]peer.AddrInfo, 0, len(addrInfoMap))
for pid, maddrs := range addrInfoMap {
addrInfos = append(addrInfos, peer.AddrInfo{
ID: pid,
Addrs: maddrs,
})
}

return addrInfos, nil
}

func (c *Crawl) BootstrapEnodesV4() ([]*enode.Node, error) {
nodesMap := map[enode.ID]*enode.Node{}
for _, url := range c.BootstrapPeers.Value() {
n, err := enode.ParseV4(url)
if err != nil {
return nil, fmt.Errorf("parse bootstrap enode URL %s: %w", url, err)
}
nodesMap[n.ID()] = n
}

enodes := make([]*enode.Node, 0, len(nodesMap))
for _, node := range nodesMap {
enodes = append(enodes, node)
}

return enodes, nil
}

func (c *Crawl) BootstrapEnodesV5() ([]*enode.Node, error) {
nodesMap := map[enode.ID]*enode.Node{}
for _, enr := range c.BootstrapPeers.Value() {
n, err := enode.Parse(enode.ValidSchemes, enr)
if err != nil {
return nil, fmt.Errorf("parse bootstrap enr: %w", err)
}
nodesMap[n.ID()] = n
}

enodes := make([]*enode.Node, 0, len(nodesMap))
for _, node := range nodesMap {
enodes = append(enodes, node)
}

return enodes, nil
}

// String prints the configuration as a json string
func (c *Crawl) String() string {
data, _ := json.MarshalIndent(c, "", " ")
Expand Down
1 change: 1 addition & 0 deletions db/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Client interface {
io.Closer
InitCrawl(ctx context.Context, version string) (*models.Crawl, error)
UpdateCrawl(ctx context.Context, crawl *models.Crawl) error
QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error)
PersistCrawlProperties(ctx context.Context, crawl *models.Crawl, properties map[string]map[string]int) error
PersistCrawlVisit(ctx context.Context, crawlID int, peerID peer.ID, maddrs []ma.Multiaddr, protocols []string, agentVersion string, connectDuration time.Duration, crawlDuration time.Duration, visitStartedAt time.Time, visitEndedAt time.Time, connectErrorStr string, crawlErrorStr string, properties null.JSON) (*InsertVisitResult, error)
PersistNeighbors(ctx context.Context, crawl *models.Crawl, dbPeerID *int, peerID peer.ID, errorBits uint16, dbNeighborsIDs []int, neighbors []peer.ID) error
Expand Down
8 changes: 4 additions & 4 deletions db/client_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,8 @@ func (c *DBClient) PersistCrawlProperties(ctx context.Context, crawl *models.Cra
func (c *DBClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error) {
peers, err := models.Peers(
qm.Load(models.PeerRels.MultiAddresses),
qm.InnerJoin("sessions_closed s on s.peer_id = peers.id"),
qm.OrderBy("s.created_at"),
qm.InnerJoin("sessions_open s on s.peer_id = peers.id"),
qm.OrderBy("s.last_visited_at"),
qm.Limit(limit),
).All(ctx, c.dbh)
if err != nil {
Expand All @@ -796,14 +796,14 @@ func (c *DBClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.A
for _, p := range peers {
id, err := peer.Decode(p.MultiHash)
if err != nil {
log.Warnln("Could not decode multi hash ", p.MultiHash)
log.WithError(err).Warnln("Could not decode multi hash ", p.MultiHash)
continue
}
var maddrs []ma.Multiaddr
for _, maddrStr := range p.R.MultiAddresses {
maddr, err := ma.NewMultiaddr(maddrStr.Maddr)
if err != nil {
log.Warnln("Could not decode multi addr ", maddrStr)
log.WithError(err).Warnln("Could not decode multi addr ", maddrStr)
continue
}

Expand Down
4 changes: 4 additions & 0 deletions db/client_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (c *JSONClient) UpdateCrawl(ctx context.Context, crawl *models.Crawl) error
return nil
}

func (c *JSONClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error) {
return []peer.AddrInfo{}, nil
}

func (c *JSONClient) PersistCrawlProperties(ctx context.Context, crawl *models.Crawl, properties map[string]map[string]int) error {
data, err := json.Marshal(properties)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions db/client_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (n *NoopClient) UpdateCrawl(ctx context.Context, crawl *models.Crawl) error
return nil
}

func (n *NoopClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error) {
return []peer.AddrInfo{}, nil
}

func (n *NoopClient) PersistCrawlProperties(ctx context.Context, crawl *models.Crawl, properties map[string]map[string]int) error {
return nil
}
Expand Down
34 changes: 12 additions & 22 deletions discv4/driver_crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"runtime"
"time"

"github.com/dennis-tra/nebula-crawler/devp2p"

ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand All @@ -25,6 +23,7 @@ import (
"github.com/dennis-tra/nebula-crawler/core"
"github.com/dennis-tra/nebula-crawler/db"
"github.com/dennis-tra/nebula-crawler/db/models"
"github.com/dennis-tra/nebula-crawler/devp2p"
"github.com/dennis-tra/nebula-crawler/discvx"
"github.com/dennis-tra/nebula-crawler/utils"
)
Expand Down Expand Up @@ -103,15 +102,15 @@ func (p PeerInfo) Merge(other PeerInfo) PeerInfo {
}

type CrawlDriverConfig struct {
Version string
TrackNeighbors bool
DialTimeout time.Duration
BootstrapPeerStrs []string
AddrDialType config.AddrType
AddrTrackType config.AddrType
MeterProvider metric.MeterProvider
TracerProvider trace.TracerProvider
LogErrors bool
Version string
TrackNeighbors bool
DialTimeout time.Duration
BootstrapPeers []*enode.Node
AddrDialType config.AddrType
AddrTrackType config.AddrType
MeterProvider metric.MeterProvider
TracerProvider trace.TracerProvider
LogErrors bool
}

func (cfg *CrawlDriverConfig) CrawlerConfig() *CrawlerConfig {
Expand Down Expand Up @@ -163,17 +162,8 @@ func NewCrawlDriver(dbc db.Client, crawl *models.Crawl, cfg *CrawlDriverConfig)
clients = append(clients, c)
}

nodesMap := map[enode.ID]*enode.Node{}
for _, url := range cfg.BootstrapPeerStrs {
n, err := enode.ParseV4(url)
if err != nil {
return nil, fmt.Errorf("parse bootstrap enode URL %s: %w", url, err)
}
nodesMap[n.ID()] = n
}

tasksChan := make(chan PeerInfo, len(nodesMap))
for _, node := range nodesMap {
tasksChan := make(chan PeerInfo, len(cfg.BootstrapPeers))
for _, node := range cfg.BootstrapPeers {
pi, err := NewPeerInfo(node)
if err != nil {
return nil, fmt.Errorf("new peer info from enr: %w", err)
Expand Down
Loading