Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions go.sum

Large diffs are not rendered by default.

51 changes: 50 additions & 1 deletion pkg/crawl/crawl_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/friendsofgo/errors"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -44,6 +45,12 @@ type P2PResult struct {

// As it can take some time to handle the result we track the timestamp explicitly
ConnectEndTime time.Time

// All connections that the remote peer claims to listen on
// this can be different from the ones that we received from another peer
// e.g., they could miss quic-v1 addresses if the reporting peer doesn't
// know about that protocol.
ListenAddrs []ma.Multiaddr
}

func (c *Crawler) crawlP2P(ctx context.Context, pi peer.AddrInfo) <-chan P2PResult {
Expand All @@ -67,6 +74,9 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi peer.AddrInfo) <-chan P2PResu
result.CrawlErrorStr = db.NetError(result.CrawlError)
}

// wait for the Identify exchange to complete
c.identifyWait(ctx, pi)

// Extract information from peer store
ps := c.host.Peerstore()

Expand All @@ -82,6 +92,9 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi peer.AddrInfo) <-chan P2PResu
result.Protocols[i] = string(protocols[i])
}
}

// Extract listen addresses
result.ListenAddrs = ps.Addrs(pi.ID)
}

// if there was a connection error, parse it to a known one
Expand All @@ -94,7 +107,7 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi peer.AddrInfo) <-chan P2PResu
log.WithError(err).WithField("remoteID", utils.FmtPeerID(pi.ID)).Warnln("Could not close connection to peer")
}

// send result back and close channel
// send the result back and close channel
select {
case resultCh <- result:
case <-ctx.Done():
Expand Down Expand Up @@ -195,3 +208,39 @@ func (c *Crawler) fetchNeighbors(ctx context.Context, pi peer.AddrInfo) (*Routin

return routingTable, err
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
idCtx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
for _, conn := range c.host.Network().ConnsToPeer(pi.ID) {
conn := conn

wg.Add(1)
go func() {
defer wg.Done()

select {
case <-idCtx.Done():
case <-c.host.IDService().IdentifyWait(conn):

// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
agent, err := c.host.Peerstore().Get(pi.ID, "AgentVersion")
if err == nil && agent.(string) != "" {
cancel()
return
}
}
}()
}

wg.Wait()
}
6 changes: 3 additions & 3 deletions pkg/crawl/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"time"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"

Expand All @@ -23,7 +23,7 @@ var crawlerID = atomic.NewInt32(0)
// Crawler encapsulates a libp2p host that crawls the network.
type Crawler struct {
id string
host host.Host
host *basichost.BasicHost
config *config.Crawl
pm *pb.ProtocolMessenger
crawledPeers int
Expand All @@ -32,7 +32,7 @@ type Crawler struct {
}

// NewCrawler initializes a new crawler based on the given configuration.
func NewCrawler(h host.Host, conf *config.Crawl) (*Crawler, error) {
func NewCrawler(h *basichost.BasicHost, conf *config.Crawl) (*Crawler, error) {
ms := &msgSender{
h: h,
protocols: protocol.ConvertFromStrings(conf.Protocols.Value()),
Expand Down
3 changes: 2 additions & 1 deletion pkg/crawl/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestCrawler_handleCrawlJob_unlinked(t *testing.T) {
remote, err := net.GenPeer()
require.NoError(t, err)

crawler, err := NewCrawler(h, &config.Crawl{Protocols: cli.NewStringSlice(), Root: &config.Root{}})
crawler, err := NewCrawler(h.(*basichost.BasicHost), &config.Crawl{Protocols: cli.NewStringSlice(), Root: &config.Root{}})
require.NoError(t, err)

pi := peer.AddrInfo{
Expand Down
17 changes: 17 additions & 0 deletions pkg/crawl/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package crawl
import (
"time"

"github.com/dennis-tra/nebula-crawler/pkg/utils"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/volatiletech/null/v8"
Expand Down Expand Up @@ -94,6 +95,22 @@ func (r *Result) Merge(p2pRes P2PResult, apiRes APIResult) {
r.Protocols = apiRes.ID.Protocols
}

var apiResMaddrs []ma.Multiaddr
if apiRes.ID != nil {
maddrs, err := utils.AddrsToMaddrs(apiRes.ID.Addresses)
if err == nil {
apiResMaddrs = maddrs
}
}

if len(apiResMaddrs) > 0 {
r.Peer.Addrs = apiResMaddrs
r.Peer = utils.FilterPrivateMaddrs(r.Peer)
} else if len(p2pRes.ListenAddrs) > 0 {
r.Peer.Addrs = p2pRes.ListenAddrs
r.Peer = utils.FilterPrivateMaddrs(r.Peer)
}

if len(r.RoutingTable.Neighbors) == 0 && apiRes.RoutingTable != nil {
// construct routing table struct from API response
rt := &RoutingTable{
Expand Down
10 changes: 6 additions & 4 deletions pkg/crawl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
log "github.com/sirupsen/logrus"

Expand All @@ -32,7 +32,7 @@ import (
// persister directly consume the results queue would not allow that.
type Scheduler struct {
// The libp2p node that's used to crawl the network. This one is also passed to all crawlers.
host host.Host
host *basichost.BasicHost

// The database client
dbc db.Client
Expand Down Expand Up @@ -105,7 +105,7 @@ func NewScheduler(conf *config.Crawl, dbc db.Client) (*Scheduler, error) {
}

s := &Scheduler{
host: h,
host: h.(*basichost.BasicHost),
dbc: dbc,
config: conf,
inCrawlQueue: map[peer.ID]peer.AddrInfo{},
Expand Down Expand Up @@ -408,7 +408,9 @@ func (s *Scheduler) handleResult(ctx context.Context, cr Result) {
} else {
logEntry = logEntry.WithField("dialErr", cr.ConnectErrorStr)
}
} else if cr.CrawlError != nil {
}

if cr.ConnectError == nil && cr.CrawlError != nil {
// Log and count crawl errors
s.errors[cr.CrawlErrorStr] += 1
if cr.CrawlErrorStr == models.NetErrorUnknown {
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func MaddrsToAddrs(maddrs []ma.Multiaddr) []string {
return addrs
}

// AddrsToMaddrs maps a slice of addresses to their multiaddress representation.
func AddrsToMaddrs(addrs []string) ([]ma.Multiaddr, error) {
maddrs := make([]ma.Multiaddr, len(addrs))
for i, addr := range addrs {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
maddrs[i] = maddr
}

return maddrs, nil
}

// FilterPrivateMaddrs strips private multiaddrs from the given peer address information.
func FilterPrivateMaddrs(pi peer.AddrInfo) peer.AddrInfo {
filtered := peer.AddrInfo{
Expand Down