Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ var crawlConfig = &config.Crawl{
WriteWorkerCount: 10,
CrawlLimit: 0,
PersistNeighbors: false,
CheckExposed: false,
FilePathUdgerDB: "",
Network: string(config.NetworkIPFS),
BootstrapPeers: cli.NewStringSlice(),
Protocols: cli.NewStringSlice(string(kaddht.ProtocolDHT)),
AddrTrackTypeStr: "public",
AddrDialTypeStr: "public",
KeepENR: false,
CheckExposed: false,
}

// CrawlCommand contains the crawl sub-command configuration.
Expand Down Expand Up @@ -138,14 +139,6 @@ var CrawlCommand = &cli.Command{
Value: crawlConfig.PersistNeighbors,
Destination: &crawlConfig.PersistNeighbors,
},
&cli.BoolFlag{
Name: "check-exposed",
Usage: "Whether to check if the Kubo API is exposed. Checking also includes crawling the API.",
EnvVars: []string{"NEBULA_CRAWL_CHECK_EXPOSED"},
Value: crawlConfig.CheckExposed,
Destination: &crawlConfig.CheckExposed,
Category: flagCategoryNetwork,
},
&cli.StringFlag{
Name: "addr-track-type",
Usage: "Which type addresses should be stored to the database (private, public, any)",
Expand All @@ -167,6 +160,22 @@ var CrawlCommand = &cli.Command{
Value: crawlConfig.Network,
Destination: &crawlConfig.Network,
},
&cli.BoolFlag{
Name: "check-exposed",
Usage: "IPFS/AMINO: Whether to check if the Kubo API is exposed. Checking also includes crawling the API.",
EnvVars: []string{"NEBULA_CRAWL_CHECK_EXPOSED"},
Value: crawlConfig.CheckExposed,
Destination: &crawlConfig.CheckExposed,
Category: flagCategoryNetwork,
},
&cli.BoolFlag{
Name: "keep-enr",
Usage: "ETHEREUM_CONSENSUS: Whether to keep the full ENR.",
EnvVars: []string{"NEBULA_CRAWL_KEEP_ENR"},
Value: crawlConfig.KeepENR,
Destination: &crawlConfig.KeepENR,
Category: flagCategoryNetwork,
},
},
}

Expand Down Expand Up @@ -227,6 +236,7 @@ func CrawlAction(c *cli.Context) error {
BootstrapPeerStrs: cfg.BootstrapPeers.Value(),
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
KeepENR: crawlConfig.KeepENR,
}

// init the crawl driver
Expand Down
77 changes: 71 additions & 6 deletions cmd/nebula/cmd_monitor.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package main

import (
"context"
"errors"
"fmt"

kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/network"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/dennis-tra/nebula-crawler/pkg/config"
"github.com/dennis-tra/nebula-crawler/pkg/core"
"github.com/dennis-tra/nebula-crawler/pkg/db"
"github.com/dennis-tra/nebula-crawler/pkg/monitor"
"github.com/dennis-tra/nebula-crawler/pkg/discv5"
"github.com/dennis-tra/nebula-crawler/pkg/libp2p"
)

var monitorConfig = &config.Monitor{
Expand Down Expand Up @@ -62,6 +67,13 @@ var MonitorCommand = &cli.Command{
Destination: &monitorConfig.WriteWorkerCount,
Hidden: true,
},
&cli.StringFlag{
Name: "network",
Usage: "Which network belong the database sessions to. Relevant for parsing peer IDs and muti addresses.",
EnvVars: []string{"NEBULA_MONITOR_NETWORK"},
Value: monitorConfig.Network,
Destination: &monitorConfig.Network,
},
},
}

Expand All @@ -81,11 +93,64 @@ func MonitorAction(c *cli.Context) error {
}
}()

// Initialize the monitoring task
s, err := monitor.New(dbc, monitorConfig)
if err != nil {
return fmt.Errorf("new monitor: %w", err)
handlerCfg := &core.DialHandlerConfig{}

engineCfg := &core.EngineConfig{
WorkerCount: monitorConfig.MonitorWorkerCount,
WriterCount: monitorConfig.WriteWorkerCount,
Limit: 0,
DuplicateProcessing: true,
}

return s.MonitorNetwork(c.Context)
switch monitorConfig.Network {
case string(config.NetworkEthCons):
driverCfg := &discv5.DialDriverConfig{
Version: monitorConfig.Root.Version(),
}

driver, err := discv5.NewDialDriver(dbc, driverCfg)
if err != nil {
return fmt.Errorf("new driver: %w", err)
}

handler := core.NewDialHandler[discv5.PeerInfo](handlerCfg)
eng, err := core.NewEngine[discv5.PeerInfo, core.DialResult[discv5.PeerInfo]](driver, handler, engineCfg)
if err != nil {
return fmt.Errorf("new engine: %w", err)
}

_, err = eng.Run(c.Context)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("running crawl engine: %w", err)
}

default:
driverCfg := &libp2p.DialDriverConfig{
Version: monitorConfig.Root.Version(),
}

driver, err := libp2p.NewDialDriver(dbc, driverCfg)
if err != nil {
return fmt.Errorf("new driver: %w", err)
}

handler := core.NewDialHandler[libp2p.PeerInfo](handlerCfg)
eng, err := core.NewEngine[libp2p.PeerInfo, core.DialResult[libp2p.PeerInfo]](driver, handler, engineCfg)
if err != nil {
return fmt.Errorf("new engine: %w", err)
}

// Set the timeout for dialing peers
ctx := network.WithDialPeerTimeout(c.Context, monitorConfig.Root.DialTimeout)

// Force direct dials will prevent swarm to run into dial backoff
// errors. It also prevents proxied connections.
ctx = network.WithForceDirectDial(ctx, "prevent backoff")

_, err = eng.Run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("running crawl engine: %w", err)
}
}
return nil
}
9 changes: 6 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,6 @@ type Crawl struct {
// Whether to persist all k-bucket entries
PersistNeighbors bool

// Whether to check if the Kubo API is exposed
CheckExposed bool

// File path to the udger datbase
FilePathUdgerDB string

Expand All @@ -202,6 +199,12 @@ type Crawl struct {

// Which type of addresses should Nebula try to dial (private, public, both)
AddrDialTypeStr string

// Whether to check if the Kubo API is exposed
CheckExposed bool

// Whether to keep the full enr record alongside all parsed kv-pairs
KeepENR bool
}

func (c *Crawl) AddrTrackType() AddrType {
Expand Down
92 changes: 78 additions & 14 deletions pkg/discv5/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package discv5

import (
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math/bits"
"strings"
"sync"
"time"
Expand All @@ -30,6 +27,7 @@ import (
type CrawlerConfig struct {
DialTimeout time.Duration
AddrDialType config.AddrType
KeepENR bool
}

type Crawler struct {
Expand Down Expand Up @@ -93,24 +91,30 @@ func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {

var enrEntryEth2 ENREntryEth2
if err := node.Load(&enrEntryEth2); err == nil {
if beaconData, err := enrEntryEth2.Data(); err == nil {
properties["fork_digest"] = beaconData.ForkDigest.String()
properties["next_fork_version"] = beaconData.NextForkVersion.String()
properties["next_fork_epoch"] = beaconData.NextForkEpoch.String()
}
properties["fork_digest"] = enrEntryEth2.ForkDigest.String()
properties["next_fork_version"] = enrEntryEth2.NextForkVersion.String()
properties["next_fork_epoch"] = enrEntryEth2.NextForkEpoch.String()
}

var enrEntryAttnets ENREntryAttnets
if err := node.Load(&enrEntryAttnets); err == nil {
rawInt := binary.BigEndian.Uint64(enrEntryAttnets)
properties["attnets_num"] = bits.OnesCount64(rawInt)
properties["attnets"] = hex.EncodeToString(enrEntryAttnets)
properties["attnets_num"] = enrEntryAttnets.AttnetsNum
properties["attnets"] = enrEntryAttnets.Attnest
}

var enrEntrySyncCommsSubnet ENREntrySyncCommsSubnet
if err := node.Load(&enrEntrySyncCommsSubnet); err == nil {
// check out https://github.com/prysmaticlabs/prysm/blob/203dc5f63b060821c2706f03a17d66b3813c860c/beacon-chain/p2p/subnets.go#L221
properties["syncnets"] = hex.EncodeToString(enrEntrySyncCommsSubnet)
properties["syncnets"] = enrEntrySyncCommsSubnet.SyncNets
}

var enrEntryOpStack ENREntryOpStack
if err := node.Load(&enrEntryOpStack); err == nil {
properties["opstack_chain_id"] = enrEntryOpStack.ChainID
properties["opstack_version"] = enrEntryOpStack.Version
}

if c.cfg.KeepENR {
properties["enr"] = node.String()
}

data, err := json.Marshal(properties)
Expand Down Expand Up @@ -202,17 +206,31 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
if len(pi.Addrs) == 0 {
metrics.VisitErrorsCount.With(metrics.CrawlLabel).Inc()
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
} else if len(pi.Addrs) == 1 {
}

dialAddrInfo := peer.AddrInfo{
ID: pi.ID,
Addrs: ensureTCPAddr(pi.Addrs),
}

replaced := false
if len(dialAddrInfo.Addrs) != len(pi.Addrs) {
replaced = true
}

retry := 0
maxRetries := 1
for {
timeout := time.Duration(c.cfg.DialTimeout.Nanoseconds() / int64(retry+1))
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
err := c.host.Connect(timeoutCtx, pi)
err := c.host.Connect(timeoutCtx, dialAddrInfo)
cancel()

if err == nil {
if replaced {
log.WithField("remoteID", pi.ID.ShortString()).Errorln("hat was gebracht!")
}
return nil
}

Expand Down Expand Up @@ -241,6 +259,52 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
}
}

func ensureTCPAddr(maddrs []ma.Multiaddr) []ma.Multiaddr {
for _, maddr := range maddrs {
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
return maddrs
}
}

newMaddrs := make([]ma.Multiaddr, 0, len(maddrs)+1)

for i, maddr := range maddrs {
newMaddrs = append(newMaddrs, maddr)

udp, err := maddr.ValueForProtocol(ma.P_UDP)
if err != nil {
continue
}

ip := ""
ip4, err := maddr.ValueForProtocol(ma.P_IP4)
if err != nil {
ip6, err := maddr.ValueForProtocol(ma.P_IP6)
if err != nil {
continue
}
ip = "/ip6/" + ip6
} else {
ip = "/ip4/" + ip4
}

tcpMaddr, err := ma.NewMultiaddr(ip + "/tcp/" + udp)
if err != nil {
continue
}

for _, remaining := range maddrs[i+1:] {
newMaddrs = append(newMaddrs, remaining)
}

newMaddrs = append(newMaddrs, tcpMaddr)

return newMaddrs
}

return maddrs
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
Expand Down
60 changes: 60 additions & 0 deletions pkg/discv5/crawler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package discv5

import (
"testing"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"

"github.com/dennis-tra/nebula-crawler/pkg/nebtest"
)

func Test_ensureTCPAddr(t *testing.T) {
tests := []struct {
name string
maddrs []ma.Multiaddr
want []ma.Multiaddr
}{
{
name: "empty",
maddrs: []ma.Multiaddr{},
want: []ma.Multiaddr{},
},
{
name: "tcp exists",
maddrs: []ma.Multiaddr{
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
},
want: []ma.Multiaddr{
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
},
},
{
name: "single udp ip4",
maddrs: []ma.Multiaddr{
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
},
want: []ma.Multiaddr{
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/udp/1234"),
nebtest.MustMultiaddr(t, "/ip4/123.1.1.1/tcp/1234"),
},
},
{
name: "single udp ip6",
maddrs: []ma.Multiaddr{
nebtest.MustMultiaddr(t, "/ip6/::1/udp/1234"),
},
want: []ma.Multiaddr{
nebtest.MustMultiaddr(t, "/ip6/::1/udp/1234"),
nebtest.MustMultiaddr(t, "/ip6/::1/tcp/1234"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, ensureTCPAddr(tt.maddrs), "ensureTCPAddr(%v)", tt.maddrs)
})
}
}
Loading