Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,5 @@ out
.tool-versions
**/*.dat
**/*.mmdb
*.logs
*.log
2 changes: 1 addition & 1 deletion cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func storeNeighbors[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, dbCr
}
}
log.WithFields(log.Fields{
"duration": time.Since(start),
"duration": time.Since(start).String(),
"avg": fmt.Sprintf("%.2fms", time.Since(start).Seconds()/float64(len(handler.RoutingTables))*1000),
"peers": len(handler.RoutingTables),
"totalNeighbors": neighborsCount,
Expand Down
3 changes: 1 addition & 2 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"go.opentelemetry.io/otel/metric"
mnoop "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
tnoop "go.opentelemetry.io/otel/trace/noop"

"github.com/dennis-tra/nebula-crawler/config"
"github.com/dennis-tra/nebula-crawler/tele"
Expand Down Expand Up @@ -57,7 +56,7 @@ func DefaultEngineConfig() *EngineConfig {
DuplicateProcessing: false,
AddrDialType: config.AddrTypeAny,
MeterProvider: mnoop.NewMeterProvider(),
TracerProvider: tnoop.NewTracerProvider(),
TracerProvider: trace.NewNoopTracerProvider(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions db/client_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/volatiletech/sqlboiler/v4/boil"
"github.com/volatiletech/sqlboiler/v4/queries/qm"
mnoop "go.opentelemetry.io/otel/metric/noop"
tnoop "go.opentelemetry.io/otel/trace/noop"
"go.opentelemetry.io/otel/trace"

"github.com/dennis-tra/nebula-crawler/config"
"github.com/dennis-tra/nebula-crawler/db/models"
Expand Down Expand Up @@ -66,7 +66,7 @@ func setup(t *testing.T) (context.Context, *DBClient, func(t *testing.T)) {
ProtocolsCacheSize: 100,
ProtocolsSetCacheSize: 100,
MeterProvider: mnoop.NewMeterProvider(),
TracerProvider: tnoop.NewTracerProvider(),
TracerProvider: trace.NewNoopTracerProvider(),
}

client, err := InitDBClient(ctx, &c)
Expand Down
2 changes: 0 additions & 2 deletions db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var KnownErrors = map[string]string{
"can't assign requested address": models.NetErrorCantAssignRequestedAddress, // transient error
"cannot assign requested address": models.NetErrorCantAssignRequestedAddress, // transient error
"connection gated": models.NetErrorConnectionGated, // transient error
"connection closed immediately": models.NetErrorConnectionClosedImmediately,
}

var ErrorStr = map[string]string{}
Expand Down Expand Up @@ -75,7 +74,6 @@ var knownErrorsPrecedence = []string{
"failed to negotiate stream multiplexer",
"resource limit exceeded",
"Write on stream",
"connection closed immediately",
}

// NetError extracts the appropriate error type from the given error.
Expand Down
1 change: 0 additions & 1 deletion db/migrations/000026_add_net_errors.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ BEGIN;
ALTER TYPE net_error ADD VALUE 'connection_reset_by_peer';
ALTER TYPE net_error ADD VALUE 'cant_assign_requested_address';
ALTER TYPE net_error ADD VALUE 'connection_gated';
ALTER TYPE net_error ADD VALUE 'connection_closed_immediately';
ALTER TYPE net_error RENAME VALUE 'no_public_ip' TO 'no_ip_address';

CREATE OR REPLACE FUNCTION calc_max_failed_visits(
Expand Down
46 changes: 22 additions & 24 deletions db/models/boil_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 76 additions & 45 deletions discv5/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
libp2pResult := <-libp2pResultCh
discV5Result := <-discV5ResultCh

properties := c.PeerProperties(task.Node)

if libp2pResult.ConnClosedImmediately {
properties["direct_close"] = true
}

if libp2pResult.GenTCPAddr {
properties["gen_tcp_addr"] = true
}

data, err := json.Marshal(properties)
if err != nil {
log.WithError(err).WithField("properties", properties).Warnln("Could not marshal peer properties")
}

cr := core.CrawlResult[PeerInfo]{
CrawlerID: c.id,
Info: task,
Expand All @@ -75,7 +90,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
CrawlEndTime: time.Now(),
ConnectStartTime: libp2pResult.ConnectStartTime,
ConnectEndTime: libp2pResult.ConnectEndTime,
Properties: c.PeerProperties(task.Node),
Properties: data,
LogErrors: c.cfg.LogErrors,
}

Expand All @@ -85,7 +100,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
return cr, nil
}

func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
func (c *Crawler) PeerProperties(node *enode.Node) map[string]any {
properties := map[string]any{}

properties["seq"] = node.Record().Seq()
Expand Down Expand Up @@ -119,23 +134,19 @@ func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
properties["enr"] = node.String()
}

data, err := json.Marshal(properties)
if err != nil {
log.WithError(err).WithField("properties", properties).Warnln("Could not marshal peer properties")
return nil
}

return data
return properties
}

type Libp2pResult struct {
ConnectStartTime time.Time
ConnectEndTime time.Time
ConnectError error
ConnectErrorStr string
Agent string
Protocols []string
ListenAddrs []ma.Multiaddr
ConnectStartTime time.Time
ConnectEndTime time.Time
ConnectError error
ConnectErrorStr string
Agent string
Protocols []string
ListenAddrs []ma.Multiaddr
ConnClosedImmediately bool // whether conn was no error but still unconnected
GenTCPAddr bool // whether a TCP address was generated
}

func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResult {
Expand All @@ -144,9 +155,16 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
go func() {
result := Libp2pResult{}

// sanitize the given addresses like removing UDP-only addresses and
// adding corresponding TCP addresses.
sanitizedAddrs, generated := sanitizeAddrs(pi.Addrs())

// keep track if we generated a TCP address to dial
result.GenTCPAddr = generated

addrInfo := peer.AddrInfo{
ID: pi.ID(),
Addrs: pi.Addrs(),
Addrs: sanitizedAddrs,
}

result.ConnectStartTime = time.Now()
Expand All @@ -156,6 +174,22 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

// check if we're connected
if c.host.Network().Connectedness(pi.ID()) == network.NotConnected {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This point is not a big problem at this
// point because fetchNeighbors will open the connection again. This
// works more often than not but is still weird. At least keep track
// of these cases.
result.ConnClosedImmediately = true

// try it again one more time
if !c.isIdentified(addrInfo.ID) {
_ = c.connect(ctx, addrInfo)
}
}

// wait for the Identify exchange to complete
c.identifyWait(ctx, addrInfo)

Expand Down Expand Up @@ -207,11 +241,6 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

dialAddrInfo := peer.AddrInfo{
ID: pi.ID,
Addrs: sanitizeAddrs(pi.Addrs),
}

var (
retry int = 0
maxRetries int = 2
Expand All @@ -221,22 +250,16 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
for {
logEntry := log.WithFields(log.Fields{
"timeout": c.cfg.DialTimeout.String(),
"remoteID": dialAddrInfo.ID.String(),
"remoteID": pi.ID.String(),
"retry": retry,
"maddrs": dialAddrInfo.Addrs,
"maddrs": pi.Addrs,
})
logEntry.Debugln("Connecting to peer", dialAddrInfo.ID.ShortString())
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, dialAddrInfo)
err := c.host.Connect(timeoutCtx, pi)
cancel()

// if libp2p says we established a connection, but we're not actually
// connected, assign a custom error.
if err == nil && c.host.Network().Connectedness(pi.ID) != network.Connected {
err = fmt.Errorf("connection closed immediately")
}

// if we still don't have an error (despite the above custom error
// handling), we return to the caller.
if err == nil {
Expand All @@ -247,29 +270,31 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// because subsequent connection attempts have a shorter timeout which
// means that it's more likely to run into a context.DeadlineExceeded
// error. If that's the case, we return the original error for tracking
// purposes.
// purposes. If the error is nil, we're not connected and not identified
// firstErr will stay nil. This is fine because we'll track that outside
// of this connect call.
if firstErr == nil {
firstErr = err
}

switch true {
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorNegotiateSecurityProtocol]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionRefused]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionClosedImmediately]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionGated]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorCantAssignRequestedAddress]):
// case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]): often because of mismatching peer ids, so excluding it for now
default:
if errors.Is(err, context.DeadlineExceeded) {
err = firstErr
}
logEntry.WithError(err).Debugln("Failed connecting to peer", dialAddrInfo.ID.ShortString())
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
}

if retry == maxRetries {
if errors.Is(err, context.DeadlineExceeded) {
err = firstErr
}
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", dialAddrInfo.ID.ShortString())
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
}

Expand All @@ -291,7 +316,7 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// there is no other reliable transport address like TCP or QUIC we use the UDP
// IP address + port and craft a TCP address out of it. The UDP address will
// still be removed and replaced with TCP.
func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
func sanitizeAddrs(maddrs []ma.Multiaddr) ([]ma.Multiaddr, bool) {
newMaddrs := make([]ma.Multiaddr, 0, len(maddrs))
for _, maddr := range maddrs {
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
Expand All @@ -306,7 +331,7 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
}

if len(newMaddrs) > 0 {
return newMaddrs
return newMaddrs, false
}

for i, maddr := range maddrs {
Expand Down Expand Up @@ -338,10 +363,10 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {

newMaddrs = append(newMaddrs, tcpMaddr)

return newMaddrs
return newMaddrs, true
}

return maddrs
return maddrs, false
}

// identifyWait waits until any connection to a peer passed the Identify
Expand All @@ -350,7 +375,7 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // TODO: parameterize
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: parameterize
defer cancel()

var wg sync.WaitGroup
Expand All @@ -368,8 +393,7 @@ func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
agent, err := c.host.Peerstore().Get(pi.ID, "AgentVersion")
if err == nil && agent.(string) != "" {
if c.isIdentified(pi.ID) {
cancel()
return
}
Expand All @@ -380,6 +404,13 @@ func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}

type DiscV5Result struct {
// The time we received the first successful response
RespondedAt *time.Time
Expand Down
Loading