Skip to content

Commit 3c1e35e

Browse files
committed
docs: code comments
1 parent 7e9ce0b commit 3c1e35e

6 files changed

Lines changed: 39 additions & 21 deletions

File tree

pkg/core/core.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package core
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/libp2p/go-libp2p/core/peer"
@@ -54,6 +55,14 @@ type Driver[I PeerInfo[I], R WorkResult[I]] interface {
5455
Close()
5556
}
5657

58+
// A Worker processes tasks of type T and returns results of type R or an error.
59+
// Workers are used to process a single peer or store a crawl result to the
60+
// database. It is the unit of concurrency in this system.
61+
type Worker[T any, R any] interface {
62+
// Work instructs the Worker to process the task and produce a result.
63+
Work(ctx context.Context, task T) (R, error)
64+
}
65+
5766
// Handler defines the interface that the engine will call every time
5867
// it has received a result from any of its workers.
5968
type Handler[I PeerInfo[I], R WorkResult[I]] interface {
@@ -94,6 +103,13 @@ type WorkResult[I PeerInfo[I]] interface {
94103
LogEntry() *log.Entry
95104
}
96105

106+
// Result is a generic result object. It captures a generic value or any
107+
// error that might have occurred when producing this result.
108+
type Result[R any] struct {
109+
Value R
110+
Error error
111+
}
112+
97113
// WriteResult must be returned by write workers.
98114
type WriteResult struct {
99115
*db.InsertVisitResult

pkg/core/engine.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,10 @@ func (e *Engine[I, R]) handlePeerResult(result Result[R]) {
368368
}
369369

370370
// If we don't know any multi addresses for the peer yet, we push it
371-
// to the end of our priority queue by giving it a low priority.
371+
// to the end of our priority queue by giving it a low priority. If we
372+
// find that peer again in another routing table, we might find another
373+
// multi address. In that case, we update the set of addresses and
374+
// increase the priority.
372375
priority := 1
373376
if len(e.maddrFilter(task.Addrs())) == 0 {
374377
priority = 0

pkg/core/queue.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ import (
44
"container/heap"
55
)
66

7-
// An item is something we manage in a priority queue.
8-
type item[T any] struct {
9-
value T // The value of the item; arbitrary.
10-
key string
11-
priority int // The priority of the item in the queue.
12-
index int // The index of the item in the heap.
13-
}
14-
7+
// PriorityQueue is can take unique items and pops them according to their priority.
158
type PriorityQueue[T any] struct {
169
queue *priorityQueue[T]
1710
lookup map[string]*item[T]
1811
}
1912

13+
// An item is something we manage in a priority queue.
14+
type item[T any] struct {
15+
value T // The value of the item; arbitrary.
16+
key string // The key belonging to the value
17+
priority int // The priority of the item in the queue.
18+
index int // The index of the item in the heap.
19+
}
20+
2021
func NewPriorityQueue[T any]() *PriorityQueue[T] {
2122
queue := make(priorityQueue[T], 0)
2223
return &PriorityQueue[T]{

pkg/core/worker.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import (
55
"sync"
66
)
77

8-
type Worker[T any, R any] interface {
9-
Work(ctx context.Context, task T) (R, error)
10-
}
11-
8+
// A Pool manages a set of Workers
129
type Pool[T any, R any] struct {
1310
start sync.Once
1411
results chan Result[R]
@@ -22,6 +19,10 @@ func NewPool[T any, R any](workers ...Worker[T, R]) *Pool[T, R] {
2219
}
2320
}
2421

22+
// Start takes a channel on which tasks will be scheduled. It is guaranteed that
23+
// the Pool reads from this channel as fast as possible. To stop the worker pool
24+
// you need to close the channel. To wait until all Workers have finished, wait
25+
// until the results channel returned from this method was closed as well.
2526
func (w *Pool[T, R]) Start(ctx context.Context, tasks <-chan T) <-chan Result[R] {
2627
w.start.Do(func() {
2728
var wg sync.WaitGroup
@@ -52,8 +53,3 @@ func (w *Pool[T, R]) Start(ctx context.Context, tasks <-chan T) <-chan Result[R]
5253
func (w *Pool[T, R]) Size() int {
5354
return len(w.workers)
5455
}
55-
56-
type Result[R any] struct {
57-
Value R
58-
Error error
59-
}

pkg/libp2p/crawler_p2p.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
7474
result.CrawlErrorStr = db.NetError(result.CrawlError)
7575
}
7676

77-
// wait for the Identify exchange to complete
77+
// wait for the Identify exchange to complete (no-op if already done)
7878
c.identifyWait(ctx, pi.AddrInfo)
7979

8080
// Extract information from peer store
@@ -189,7 +189,8 @@ func (c *Crawler) fetchNeighbors(ctx context.Context, pi peer.AddrInfo) (*core.R
189189
// This error happens in: https://github.com/libp2p/go-libp2p/blob/4e2a16dd3f4f980bf9429572b3d2aed885594ec4/p2p/host/basic/basic_host.go#L645
190190
if err.Error() == "connection failed" {
191191
// This means we were connected to the peer, tried to open
192-
// a stream but then failed to do so.
192+
// a stream but then failed to do so. Try to reconnect as
193+
// the peer appears to be online
193194

194195
select {
195196
case <-ctx.Done():

pkg/utils/utils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ func FilterPublicMaddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
9292
return filtered
9393
}
9494

95-
// MergeMaddrs strips private multiaddrs from the given peer address information.
95+
// MergeMaddrs takes two slices of multi addresses and merges them into a single
96+
// one.
9697
func MergeMaddrs(maddrSet1 []ma.Multiaddr, maddrSet2 []ma.Multiaddr) []ma.Multiaddr {
9798
maddrSetOut := make(map[string]ma.Multiaddr, len(maddrSet1))
9899
for _, maddr := range maddrSet1 {

0 commit comments

Comments
 (0)