Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion pkg/crawl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/volatiletech/null/v8"
Expand Down Expand Up @@ -70,7 +71,22 @@ func (s *Scheduler) persistNeighbors() {
}
i++
neighborsCount += len(routingTable.Neighbors)
if err := s.dbc.PersistNeighbors(s.crawl, p, routingTable.ErrorBits, routingTable.PeerIDs()); err != nil {

var dbPeerID *int
if id, found := s.peerMappings[p]; found {
dbPeerID = &id
}

dbPeerIDs := []int{}
peerIDs := []peer.ID{}
for _, n := range routingTable.Neighbors {
if id, found := s.peerMappings[n.ID]; found {
dbPeerIDs = append(dbPeerIDs, id)
} else {
peerIDs = append(peerIDs, n.ID)
}
}
if err := s.dbc.PersistNeighbors(s.crawl, dbPeerID, p, routingTable.ErrorBits, dbPeerIDs, peerIDs); err != nil {
log.WithError(err).WithField("peerID", utils.FmtPeerID(p)).Warnln("Could not persist neighbors")
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/crawl/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewPersister(dbc *db.Client, conf *config.Config, crawl *models.Crawl) (*Pe

// StartPersisting enters an endless loop and consumes persist jobs from the persist queue
// until it is told to stop or the persist queue was closed.
func (p *Persister) StartPersisting(ctx context.Context, persistQueue *queue.FIFO[Result]) {
func (p *Persister) StartPersisting(ctx context.Context, persistQueue *queue.FIFO[Result], resultsQueue *queue.FIFO[*db.InsertVisitResult]) {
defer close(p.done)
for {
// Give the shutdown signal precedence
Expand All @@ -63,13 +63,15 @@ func (p *Persister) StartPersisting(ctx context.Context, persistQueue *queue.FIF
// The persist queue was closed
return
}
p.handlePersistJob(ctx, r)

ivr := p.handlePersistJob(ctx, r)
resultsQueue.Push(ivr)
}
}
}

// handlePersistJob takes a crawl result (persist job) and inserts a denormalized database entry of the results.
func (p *Persister) handlePersistJob(ctx context.Context, cr Result) {
func (p *Persister) handlePersistJob(ctx context.Context, cr Result) *db.InsertVisitResult {
logEntry := log.WithFields(log.Fields{
"persisterID": p.id,
"remoteID": utils.FmtPeerID(cr.Peer.ID),
Expand All @@ -79,7 +81,7 @@ func (p *Persister) handlePersistJob(ctx context.Context, cr Result) {

start := time.Now()

err := p.insertRawVisit(ctx, cr)
ivr, err := p.insertVisit(ctx, cr)
if err != nil && !errors.Is(ctx.Err(), context.Canceled) {
logEntry.WithError(err).Warnln("Error inserting raw visit")
} else {
Expand All @@ -90,11 +92,11 @@ func (p *Persister) handlePersistJob(ctx context.Context, cr Result) {
WithField("success", err == nil).
WithField("duration", time.Since(start)).
Infoln("Persisted result from worker", cr.CrawlerID)
return ivr
}

// insertRawVisit builds up a raw_visit database entry.
func (p *Persister) insertRawVisit(ctx context.Context, cr Result) error {

// insertVisit builds up a visit database entry.
func (p *Persister) insertVisit(ctx context.Context, cr Result) (*db.InsertVisitResult, error) {
return p.dbc.PersistCrawlVisit(
ctx,
p.dbc.Handle(),
Expand Down
79 changes: 52 additions & 27 deletions pkg/crawl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ type Scheduler struct {

// The queue that the crawlers publish their results on, so that the scheduler can handle them,
// e.g. update the maps above etc.
resultsQueue *queue.FIFO[Result]
crawlResultsQueue *queue.FIFO[Result]

// A queue that takes crawl results and gets consumed by persisters that save the data into the DB.
persistQueue *queue.FIFO[Result]

// The queue that the persisters publish their results on, so that the peerID -> database id mapping (peerMappings)
// can be built.
persistResultsQueue *queue.FIFO[*db.InsertVisitResult]

// A map of agent versions and their occurrences that happened during the crawl.
agentVersion map[string]int

Expand All @@ -76,6 +80,12 @@ type Scheduler struct {

// A map that keeps track of all k-bucket entries of a particular peer.
routingTables map[peer.ID]*RoutingTable

// A map that maps peer IDs to their database IDs. This speeds up the insertion of neighbor information as
// the database does not need to look up every peer ID but only the ones not yet present in the database.
// Speed up for ~11k peers: 5.5 min -> 30s
// TODO: Disable maintenance of map if dry-run is active
peerMappings map[peer.ID]int
}

// NewScheduler initializes a new libp2p host and scheduler instance.
Expand All @@ -100,18 +110,20 @@ func NewScheduler(ctx context.Context, conf *config.Config, dbc *db.Client) (*Sc
}

s := &Scheduler{
host: h,
dbc: dbc,
config: conf,
inCrawlQueue: map[peer.ID]peer.AddrInfo{},
crawled: map[peer.ID]peer.AddrInfo{},
crawlQueue: queue.NewFIFO[peer.AddrInfo](),
resultsQueue: queue.NewFIFO[Result](),
persistQueue: queue.NewFIFO[Result](),
agentVersion: map[string]int{},
protocols: map[string]int{},
errors: map[string]int{},
routingTables: map[peer.ID]*RoutingTable{},
host: h,
dbc: dbc,
config: conf,
inCrawlQueue: map[peer.ID]peer.AddrInfo{},
crawled: map[peer.ID]peer.AddrInfo{},
crawlQueue: queue.NewFIFO[peer.AddrInfo](),
crawlResultsQueue: queue.NewFIFO[Result](),
persistQueue: queue.NewFIFO[Result](),
persistResultsQueue: queue.NewFIFO[*db.InsertVisitResult](),
agentVersion: map[string]int{},
protocols: map[string]int{},
errors: map[string]int{},
routingTables: map[peer.ID]*RoutingTable{},
peerMappings: map[peer.ID]int{},
}

return s, nil
Expand Down Expand Up @@ -159,7 +171,7 @@ func (s *Scheduler) CrawlNetwork(ctx context.Context, bootstrap []peer.AddrInfo)
s.readResultsQueue(ctx)

// Indicate that we won't publish any new crawl tasks to the queue.
// TODO: This can still leak a Go routine. However we're exiting here anyway...
// TODO: This can still leak a Go routine. However, we're exiting here anyway...
s.crawlQueue.DoneProducing()

// Stop crawlers - blocking
Expand All @@ -170,8 +182,8 @@ func (s *Scheduler) CrawlNetwork(ctx context.Context, bootstrap []peer.AddrInfo)
}

// Indicate that the crawlers won't send any new results as they are now stopped.
// TODO: This can still leak a Go routine. However we're exiting here anyway...
s.resultsQueue.DoneProducing()
// TODO: This can still leak a Go routine. However, we're exiting here anyway...
s.crawlResultsQueue.DoneProducing()

// Indicate that we won't send any more results to the persisters. This will
// lead the persisters to consume the queue until the end and then stop automatically,
Expand All @@ -192,9 +204,14 @@ func (s *Scheduler) CrawlNetwork(ctx context.Context, bootstrap []peer.AddrInfo)
<-p.done
}

// Indicate that the persisters won't send any new results as they are now stopped.
// TODO: This can still leak a Go routine. However we're exiting here anyway...
s.persistResultsQueue.DoneProducing()

// Finally, log the crawl summary
defer s.logSummary()

// Return early if we are in a dry-run
if s.dbc == nil {
return nil
}
Expand All @@ -204,10 +221,12 @@ func (s *Scheduler) CrawlNetwork(ctx context.Context, bootstrap []peer.AddrInfo)
return errors.Wrap(err, "persist crawl")
}

// Persist associated crawl properties
if err := s.persistCrawlProperties(context.Background()); err != nil {
return errors.Wrap(err, "persist crawl properties")
}

// persist all neighbor information
s.persistNeighbors()

return nil
Expand Down Expand Up @@ -264,7 +283,7 @@ func (s *Scheduler) startCrawlers(ctx context.Context) ([]*Crawler, context.Cont
return nil, nil, nil, errors.Wrap(err, "new crawler")
}
crawlers = append(crawlers, c)
go c.StartCrawling(crawlerCtx, s.crawlQueue, s.resultsQueue)
go c.StartCrawling(crawlerCtx, s.crawlQueue, s.crawlResultsQueue)
}

return crawlers, crawlerCtx, crawlerCancel, nil
Expand All @@ -287,17 +306,16 @@ func (s *Scheduler) startPersisters(ctx context.Context) ([]*Persister, context.
return nil, nil, errors.Wrap(err, "new persister")
}
persisters = append(persisters, p)
go p.StartPersisting(persistersCtx, s.persistQueue)
go p.StartPersisting(persistersCtx, s.persistQueue, s.persistResultsQueue)
}

return persisters, persistersCancel, nil
}

// readResultsQueue listens for crawl results on the resultsQueue and handles any
// readResultsQueue listens for crawl results on the crawlResultsQueue and handles any
// entries in handleResult. If the scheduler is asked to shut down it
// breaks out of this loop and the clean-up routines above take over.
func (s *Scheduler) readResultsQueue(ctx context.Context) {
var result Result
for {
// Give the shutdown signal precedence
select {
Expand All @@ -309,18 +327,25 @@ func (s *Scheduler) readResultsQueue(ctx context.Context) {
select {
case <-ctx.Done():
return
case r, ok := <-s.resultsQueue.Consume():
case r, ok := <-s.crawlResultsQueue.Consume():
if !ok {
return
}
result = r
}

s.handleResult(ctx, result)
s.handleResult(ctx, r)

// If the queue is empty, or we have reached the configured limit we stop the crawl.
if len(s.inCrawlQueue) == 0 || s.config.ReachedCrawlLimit(len(s.crawled)) {
return
// If the queue is empty, or we have reached the configured limit we stop the crawl.
if len(s.inCrawlQueue) == 0 || s.config.ReachedCrawlLimit(len(s.crawled)) {
return
}
case r, ok := <-s.persistResultsQueue.Consume():
if !ok {
return
}

if r.PeerID != nil {
s.peerMappings[r.PID] = *r.PeerID
}
}
}
}
Expand Down
79 changes: 72 additions & 7 deletions pkg/db/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (c *Client) PersistCrawlVisit(
visitEndedAt time.Time,
connectErrorStr string,
crawlErrorStr string,
) error {
) (*InsertVisitResult, error) {
var agentVersionID, protocolsSetID *int
var avidErr, psidErr error

Expand Down Expand Up @@ -505,7 +505,7 @@ func (c *Client) PersistDialVisit(
visitStartedAt time.Time,
visitEndedAt time.Time,
errorStr string,
) error {
) (*InsertVisitResult, error) {
return c.insertVisit(
nil,
peerID,
Expand Down Expand Up @@ -537,7 +537,7 @@ func (c *Client) insertVisit(
visitType string,
connectErrorStr string,
crawlErrorStr string,
) error {
) (*InsertVisitResult, error) {
maddrStrs := utils.MaddrsToAddrs(maddrs)

start := time.Now()
Expand All @@ -561,9 +561,72 @@ func (c *Client) insertVisit(
"success": strconv.FormatBool(err == nil),
}).Observe(time.Since(start).Seconds())
if err != nil {
return err
return nil, err
}
return rows.Close()

defer func() {
if err := rows.Close(); err != nil {
log.WithError(err).Warnln("Could not close rows")
}
}()

ivr := InsertVisitResult{
PID: peerID,
}
if !rows.Next() {
return &ivr, nil
}

if err = rows.Scan(&ivr); err != nil {
return nil, err
}

return &ivr, nil
}

type InsertVisitResult struct {
PID peer.ID
PeerID *int
VisitID *int
SessionID *int
}

func (ivr *InsertVisitResult) Scan(value interface{}) error {
data, ok := value.([]byte)
if !ok {
return fmt.Errorf("incompatible type %T", value)
}

parts := strings.Split(string(data[1:len(data)-1]), ",")
if len(parts) != 3 {
return fmt.Errorf("unexpected number of return values: %s", string(data))
}

if parts[0] != "" {
id, err := strconv.Atoi(parts[0])
if err != nil {
return fmt.Errorf("invalid db peer id %s", parts[0])
}
ivr.PeerID = &id
}

if parts[1] != "" {
id, err := strconv.Atoi(parts[1])
if err != nil {
return fmt.Errorf("invalid db visit id %s", parts[1])
}
ivr.VisitID = &id
}

if parts[2] != "" {
id, err := strconv.Atoi(parts[2])
if err != nil {
return fmt.Errorf("invalid db session id %s", parts[2])
}
ivr.SessionID = &id
}

return nil
}

func durationToInterval(dur *time.Duration) *string {
Expand All @@ -574,16 +637,18 @@ func durationToInterval(dur *time.Duration) *string {
return &s
}

func (c *Client) PersistNeighbors(crawl *models.Crawl, peerID peer.ID, errorBits uint16, neighbors []peer.ID) error {
func (c *Client) PersistNeighbors(crawl *models.Crawl, dbPeerID *int, peerID peer.ID, errorBits uint16, dbNeighborsIDs []int, neighbors []peer.ID) error {
neighborMHashes := make([]string, len(neighbors))
for i, neighbor := range neighbors {
neighborMHashes[i] = neighbor.String()
}
// postgres does not support unsigned integers. So we interpret the uint16 as an int16
bitMask := *(*int16)(unsafe.Pointer(&errorBits))
rows, err := queries.Raw("SELECT insert_neighbors($1, $2, $3, $4)",
rows, err := queries.Raw("SELECT insert_neighbors($1, $2, $3, $4, $5, $6)",
crawl.ID,
dbPeerID,
peerID.String(),
fmt.Sprintf("{%s}", strings.Trim(strings.Join(strings.Split(fmt.Sprint(dbNeighborsIDs), " "), ","), "[]")),
fmt.Sprintf("{%s}", strings.Join(neighborMHashes, ",")),
bitMask,
).Query(c.dbh)
Expand Down
Loading