Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
postgres:
image: postgres
ports:
- "2345:5432"
- "5432:5432"
env:
POSTGRES_PASSWORD: password
POSTGRES_USER: nebula_test
Expand Down
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![GitHub license](https://img.shields.io/github/license/dennis-tra/nebula)](https://github.com/dennis-tra/nebula/blob/main/LICENSE)
[![Hits](https://hits.seeyoufarm.com/api/count/incr/badge.svg?url=https%3A%2F%2Fgithub.com%2Fdennis-tra%2Fnebula&count_bg=%2379C83D&title_bg=%23555555&icon=&icon_color=%23E7E7E7&title=hits&edge_flat=false)](https://hits.seeyoufarm.com)

A DHT crawler and monitor that tracks the liveness of peers. The crawler connects to [DHT](https://en.wikipedia.org/wiki/Distributed_hash_table) bootstrappers and then recursively follows all entries in their [k-buckets](https://en.wikipedia.org/wiki/Kademlia) until all peers have been visited. The crawler supports the following networks:
A network agnostic DHT crawler and monitor. The crawler connects to [DHT](https://en.wikipedia.org/wiki/Distributed_hash_table) bootstrappers and then recursively follows all entries in their [k-buckets](https://en.wikipedia.org/wiki/Kademlia) until all peers have been visited. The crawler supports the following networks:

- [IPFS](https://ipfs.network) - [_Amino DHT_](https://blog.ipfs.tech/2023-09-amino-refactoring/)
- [Ethereum](https://ethereum.org/en/) - [_Consensus Layer_](https://ethereum.org/uz/developers/docs/networking-layer/#consensus-discovery)
Expand All @@ -25,9 +25,10 @@ _The crawler was:_
- 🏆 _awarded a prize in the [DI2F Workshop hackathon](https://research.protocol.ai/blog/2021/decentralising-the-internet-with-ipfs-and-filecoin-di2f-a-report-from-the-trenches/)._ 🏆
- 🎓 _used for the ACM SigCOMM'22 paper [Design and Evaluation of IPFS: A Storage Layer for the Decentralized Web](https://research.protocol.ai/publications/design-and-evaluation-of-ipfs-a-storage-layer-for-the-decentralized-web/trautwein2022.pdf)_ 🎓


📊 [ProbeLab](https://probelab.io) is publishing weekly reports for the IPFS Amino DHT based on the crawl results [here](https://github.com/protocol/network-measurements/tree/master/reports)! 📊

📺 You can find a demo on YouTube: [Nebula: A Network Agnostic DHT Crawler](https://www.youtube.com/watch?v=QDgvCBDqNMc) 📺

![Screenshot from a Grafana dashboard](./docs/grafana-screenshot.png)

## Table of Contents
Expand All @@ -46,6 +47,7 @@ _The crawler was:_
- [Tests](#tests)
- [Report](#report)
- [Related Efforts](#related-efforts)
- [Demo](#demo)
- [Maintainers](#maintainers)
- [Contributing](#contributing)
- [Support](#support)
Expand Down Expand Up @@ -333,6 +335,12 @@ There is a top-level `report` folder that contains a script to generate a compre
- [`migalabs/armiarma`](https://github.com/migalabs/armiarma) - Armiarma is a Libp2p open-network crawler with a current focus on Ethereum's CL network
- [`migalabs/eth-light-crawler`](https://github.com/migalabs/eth-light-crawler) - Ethereum light crawler by [@cortze](https://github.com/cortze).

## Demo

The following presentation shows a ways to use Nebula by showcasing crawls of the Amino, Celestia, and Ethereum DHT's:

[![Nebula: A Network Agnostic DHT Crawler - Dennis Trautwein](https://img.youtube.com/vi/QDgvCBDqNMc/0.jpg)](https://www.youtube.com/watch?v=QDgvCBDqNMc)

## Maintainers

[@dennis-tra](https://github.com/dennis-tra).
Expand Down
28 changes: 19 additions & 9 deletions cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ var crawlConfig = &config.Crawl{
WriteWorkerCount: 10,
CrawlLimit: 0,
PersistNeighbors: false,
CheckExposed: false,
FilePathUdgerDB: "",
Network: string(config.NetworkIPFS),
BootstrapPeers: cli.NewStringSlice(),
Protocols: cli.NewStringSlice(string(kaddht.ProtocolDHT)),
AddrTrackTypeStr: "public",
AddrDialTypeStr: "public",
KeepENR: false,
CheckExposed: false,
}

// CrawlCommand contains the crawl sub-command configuration.
Expand Down Expand Up @@ -138,14 +139,6 @@ var CrawlCommand = &cli.Command{
Value: crawlConfig.PersistNeighbors,
Destination: &crawlConfig.PersistNeighbors,
},
&cli.BoolFlag{
Name: "check-exposed",
Usage: "Whether to check if the Kubo API is exposed. Checking also includes crawling the API.",
EnvVars: []string{"NEBULA_CRAWL_CHECK_EXPOSED"},
Value: crawlConfig.CheckExposed,
Destination: &crawlConfig.CheckExposed,
Category: flagCategoryNetwork,
},
&cli.StringFlag{
Name: "addr-track-type",
Usage: "Which type addresses should be stored to the database (private, public, any)",
Expand All @@ -167,6 +160,22 @@ var CrawlCommand = &cli.Command{
Value: crawlConfig.Network,
Destination: &crawlConfig.Network,
},
&cli.BoolFlag{
Name: "check-exposed",
Usage: "IPFS/AMINO: Whether to check if the Kubo API is exposed. Checking also includes crawling the API.",
EnvVars: []string{"NEBULA_CRAWL_CHECK_EXPOSED"},
Value: crawlConfig.CheckExposed,
Destination: &crawlConfig.CheckExposed,
Category: flagCategoryNetwork,
},
&cli.BoolFlag{
Name: "keep-enr",
Usage: "ETHEREUM_CONSENSUS: Whether to keep the full ENR.",
EnvVars: []string{"NEBULA_CRAWL_KEEP_ENR"},
Value: crawlConfig.KeepENR,
Destination: &crawlConfig.KeepENR,
Category: flagCategoryNetwork,
},
},
}

Expand Down Expand Up @@ -227,6 +236,7 @@ func CrawlAction(c *cli.Context) error {
BootstrapPeerStrs: cfg.BootstrapPeers.Value(),
AddrDialType: cfg.AddrDialType(),
AddrTrackType: cfg.AddrTrackType(),
KeepENR: crawlConfig.KeepENR,
}

// init the crawl driver
Expand Down
77 changes: 71 additions & 6 deletions cmd/nebula/cmd_monitor.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package main

import (
"context"
"errors"
"fmt"

kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/network"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/dennis-tra/nebula-crawler/pkg/config"
"github.com/dennis-tra/nebula-crawler/pkg/core"
"github.com/dennis-tra/nebula-crawler/pkg/db"
"github.com/dennis-tra/nebula-crawler/pkg/monitor"
"github.com/dennis-tra/nebula-crawler/pkg/discv5"
"github.com/dennis-tra/nebula-crawler/pkg/libp2p"
)

var monitorConfig = &config.Monitor{
Expand Down Expand Up @@ -62,6 +67,13 @@ var MonitorCommand = &cli.Command{
Destination: &monitorConfig.WriteWorkerCount,
Hidden: true,
},
&cli.StringFlag{
Name: "network",
Usage: "Which network belong the database sessions to. Relevant for parsing peer IDs and muti addresses.",
EnvVars: []string{"NEBULA_MONITOR_NETWORK"},
Value: monitorConfig.Network,
Destination: &monitorConfig.Network,
},
},
}

Expand All @@ -81,11 +93,64 @@ func MonitorAction(c *cli.Context) error {
}
}()

// Initialize the monitoring task
s, err := monitor.New(dbc, monitorConfig)
if err != nil {
return fmt.Errorf("new monitor: %w", err)
handlerCfg := &core.DialHandlerConfig{}

engineCfg := &core.EngineConfig{
WorkerCount: monitorConfig.MonitorWorkerCount,
WriterCount: monitorConfig.WriteWorkerCount,
Limit: 0,
DuplicateProcessing: true,
}

return s.MonitorNetwork(c.Context)
switch monitorConfig.Network {
case string(config.NetworkEthCons):
driverCfg := &discv5.DialDriverConfig{
Version: monitorConfig.Root.Version(),
}

driver, err := discv5.NewDialDriver(dbc, driverCfg)
if err != nil {
return fmt.Errorf("new driver: %w", err)
}

handler := core.NewDialHandler[discv5.PeerInfo](handlerCfg)
eng, err := core.NewEngine[discv5.PeerInfo, core.DialResult[discv5.PeerInfo]](driver, handler, engineCfg)
if err != nil {
return fmt.Errorf("new engine: %w", err)
}

_, err = eng.Run(c.Context)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("running crawl engine: %w", err)
}

default:
driverCfg := &libp2p.DialDriverConfig{
Version: monitorConfig.Root.Version(),
}

driver, err := libp2p.NewDialDriver(dbc, driverCfg)
if err != nil {
return fmt.Errorf("new driver: %w", err)
}

handler := core.NewDialHandler[libp2p.PeerInfo](handlerCfg)
eng, err := core.NewEngine[libp2p.PeerInfo, core.DialResult[libp2p.PeerInfo]](driver, handler, engineCfg)
if err != nil {
return fmt.Errorf("new engine: %w", err)
}

// Set the timeout for dialing peers
ctx := network.WithDialPeerTimeout(c.Context, monitorConfig.Root.DialTimeout)

// Force direct dials will prevent swarm to run into dial backoff
// errors. It also prevents proxied connections.
ctx = network.WithForceDirectDial(ctx, "prevent backoff")

_, err = eng.Run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("running crawl engine: %w", err)
}
}
return nil
}
9 changes: 6 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,6 @@ type Crawl struct {
// Whether to persist all k-bucket entries
PersistNeighbors bool

// Whether to check if the Kubo API is exposed
CheckExposed bool

// File path to the udger datbase
FilePathUdgerDB string

Expand All @@ -202,6 +199,12 @@ type Crawl struct {

// Which type of addresses should Nebula try to dial (private, public, both)
AddrDialTypeStr string

// Whether to check if the Kubo API is exposed
CheckExposed bool

// Whether to keep the full enr record alongside all parsed kv-pairs
KeepENR bool
}

func (c *Crawl) AddrTrackType() AddrType {
Expand Down
92 changes: 78 additions & 14 deletions pkg/discv5/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package discv5

import (
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math/bits"
"strings"
"sync"
"time"
Expand All @@ -30,6 +27,7 @@ import (
type CrawlerConfig struct {
DialTimeout time.Duration
AddrDialType config.AddrType
KeepENR bool
}

type Crawler struct {
Expand Down Expand Up @@ -93,24 +91,30 @@ func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {

var enrEntryEth2 ENREntryEth2
if err := node.Load(&enrEntryEth2); err == nil {
if beaconData, err := enrEntryEth2.Data(); err == nil {
properties["fork_digest"] = beaconData.ForkDigest.String()
properties["next_fork_version"] = beaconData.NextForkVersion.String()
properties["next_fork_epoch"] = beaconData.NextForkEpoch.String()
}
properties["fork_digest"] = enrEntryEth2.ForkDigest.String()
properties["next_fork_version"] = enrEntryEth2.NextForkVersion.String()
properties["next_fork_epoch"] = enrEntryEth2.NextForkEpoch.String()
}

var enrEntryAttnets ENREntryAttnets
if err := node.Load(&enrEntryAttnets); err == nil {
rawInt := binary.BigEndian.Uint64(enrEntryAttnets)
properties["attnets_num"] = bits.OnesCount64(rawInt)
properties["attnets"] = hex.EncodeToString(enrEntryAttnets)
properties["attnets_num"] = enrEntryAttnets.AttnetsNum
properties["attnets"] = enrEntryAttnets.Attnest
}

var enrEntrySyncCommsSubnet ENREntrySyncCommsSubnet
if err := node.Load(&enrEntrySyncCommsSubnet); err == nil {
// check out https://github.com/prysmaticlabs/prysm/blob/203dc5f63b060821c2706f03a17d66b3813c860c/beacon-chain/p2p/subnets.go#L221
properties["syncnets"] = hex.EncodeToString(enrEntrySyncCommsSubnet)
properties["syncnets"] = enrEntrySyncCommsSubnet.SyncNets
}

var enrEntryOpStack ENREntryOpStack
if err := node.Load(&enrEntryOpStack); err == nil {
properties["opstack_chain_id"] = enrEntryOpStack.ChainID
properties["opstack_version"] = enrEntryOpStack.Version
}

if c.cfg.KeepENR {
properties["enr"] = node.String()
}

data, err := json.Marshal(properties)
Expand Down Expand Up @@ -202,17 +206,31 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
if len(pi.Addrs) == 0 {
metrics.VisitErrorsCount.With(metrics.CrawlLabel).Inc()
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
} else if len(pi.Addrs) == 1 {
}

dialAddrInfo := peer.AddrInfo{
ID: pi.ID,
Addrs: ensureTCPAddr(pi.Addrs),
}

replaced := false
if len(dialAddrInfo.Addrs) != len(pi.Addrs) {
replaced = true
}

retry := 0
maxRetries := 1
for {
timeout := time.Duration(c.cfg.DialTimeout.Nanoseconds() / int64(retry+1))
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
err := c.host.Connect(timeoutCtx, pi)
err := c.host.Connect(timeoutCtx, dialAddrInfo)
cancel()

if err == nil {
if replaced {
log.WithField("remoteID", pi.ID.ShortString()).Errorln("hat was gebracht!")
}
return nil
}

Expand Down Expand Up @@ -241,6 +259,52 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
}
}

func ensureTCPAddr(maddrs []ma.Multiaddr) []ma.Multiaddr {
for _, maddr := range maddrs {
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
return maddrs
}
}

newMaddrs := make([]ma.Multiaddr, 0, len(maddrs)+1)

for i, maddr := range maddrs {
newMaddrs = append(newMaddrs, maddr)

udp, err := maddr.ValueForProtocol(ma.P_UDP)
if err != nil {
continue
}

ip := ""
ip4, err := maddr.ValueForProtocol(ma.P_IP4)
if err != nil {
ip6, err := maddr.ValueForProtocol(ma.P_IP6)
if err != nil {
continue
}
ip = "/ip6/" + ip6
} else {
ip = "/ip4/" + ip4
}

tcpMaddr, err := ma.NewMultiaddr(ip + "/tcp/" + udp)
if err != nil {
continue
}

for _, remaining := range maddrs[i+1:] {
newMaddrs = append(newMaddrs, remaining)
}

newMaddrs = append(newMaddrs, tcpMaddr)

return newMaddrs
}

return maddrs
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
Expand Down
Loading