Skip to content

Commit 22f4959

Browse files
committed
init as many libp2p hosts as available CPU cores
1 parent 8bbbbca commit 22f4959

2 files changed

Lines changed: 106 additions & 70 deletions

File tree

discv5/driver_crawler.go

Lines changed: 72 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"net"
10+
"runtime"
1011
"time"
1112

1213
secp256k1v4 "github.com/decred/dcrd/dcrec/secp256k1/v4"
@@ -15,6 +16,7 @@ import (
1516
"github.com/ethereum/go-ethereum/p2p/enode"
1617
"github.com/libp2p/go-libp2p"
1718
"github.com/libp2p/go-libp2p-mplex"
19+
"github.com/libp2p/go-libp2p/core/connmgr"
1820
"github.com/libp2p/go-libp2p/core/crypto"
1921
"github.com/libp2p/go-libp2p/core/host"
2022
"github.com/libp2p/go-libp2p/core/network"
@@ -141,7 +143,7 @@ func (cfg *CrawlDriverConfig) WriterConfig() *core.CrawlWriterConfig {
141143
type CrawlDriver struct {
142144
cfg *CrawlDriverConfig
143145
dbc db.Client
144-
host host.Host
146+
hosts []host.Host
145147
dbCrawl *models.Crawl
146148
tasksChan chan PeerInfo
147149
peerstore *enode.DB
@@ -153,53 +155,16 @@ type CrawlDriver struct {
153155
var _ core.Driver[PeerInfo, core.CrawlResult[PeerInfo]] = (*CrawlDriver)(nil)
154156

155157
func NewCrawlDriver(dbc db.Client, crawl *models.Crawl, cfg *CrawlDriverConfig) (*CrawlDriver, error) {
156-
// Configure the resource manager to not limit anything
157-
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
158-
rm, err := rcmgr.NewResourceManager(limiter)
159-
if err != nil {
160-
return nil, fmt.Errorf("new resource manager: %w", err)
161-
}
162-
163-
ecdsaKey, err := ecdsa.GenerateKey(ethcrypto.S256(), crand.Reader)
164-
if err != nil {
165-
return nil, fmt.Errorf("generate secp256k1 key: %w", err)
166-
}
167-
168-
privBytes := elliptic.Marshal(ethcrypto.S256(), ecdsaKey.X, ecdsaKey.Y)
169-
secpKey := (*crypto.Secp256k1PrivateKey)(secp256k1v4.PrivKeyFromBytes(privBytes))
170-
171-
// Initialize a single libp2p node that's shared between all crawlers.
172-
// Context: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#network-fundamentals
173-
h, err := libp2p.New(
174-
libp2p.NoListenAddrs,
175-
libp2p.ResourceManager(rm),
176-
libp2p.Identity(secpKey),
177-
libp2p.Security(noise.ID, noise.New),
178-
libp2p.UserAgent("nebula/"+cfg.Version),
179-
libp2p.Transport(tcp.NewTCPTransport),
180-
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
181-
libp2p.Muxer(yamux.ID, yamux.DefaultTransport),
182-
libp2p.DisableMetrics(),
183-
)
184-
if err != nil {
185-
return nil, fmt.Errorf("new libp2p host: %w", err)
158+
// create a libp2p host per CPU core to distribute load
159+
hosts := make([]host.Host, 0, runtime.NumCPU())
160+
for i := 0; i < runtime.NumCPU(); i++ {
161+
h, err := newLibp2pHost(cfg.Version)
162+
if err != nil {
163+
return nil, fmt.Errorf("new libp2p host: %w", err)
164+
}
165+
hosts = append(hosts, h)
186166
}
187167

188-
// According to Diva, these are required protocols. Some of them are just
189-
// assumed to be required. We just read from the stream indefinitely to
190-
// gain time for the identify exchange to finish. We just pretend to support
191-
// these protocols and keep the stream busy until we have gathered all the
192-
// information we were interested in. This includes the agend version and
193-
// all supported protocols.
194-
h.SetStreamHandler("/eth2/beacon_chain/req/ping/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
195-
h.SetStreamHandler("/eth2/beacon_chain/req/status/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
196-
h.SetStreamHandler("/eth2/beacon_chain/req/metadata/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
197-
h.SetStreamHandler("/eth2/beacon_chain/req/metadata/2/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
198-
h.SetStreamHandler("/eth2/beacon_chain/req/goodbye/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
199-
h.SetStreamHandler("/meshsub/1.1.0", func(s network.Stream) { io.ReadAll(s) }) // https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
200-
201-
log.WithField("peerID", h.ID().String()).Infoln("Started libp2p host")
202-
203168
nodesMap := map[enode.ID]*enode.Node{}
204169
for _, enrs := range cfg.BootstrapPeerStrs {
205170
n, err := enode.Parse(enode.ValidSchemes, enrs)
@@ -227,7 +192,7 @@ func NewCrawlDriver(dbc db.Client, crawl *models.Crawl, cfg *CrawlDriverConfig)
227192
return &CrawlDriver{
228193
cfg: cfg,
229194
dbc: dbc,
230-
host: h,
195+
hosts: hosts,
231196
dbCrawl: crawl,
232197
tasksChan: tasksChan,
233198
peerstore: peerstore,
@@ -259,10 +224,13 @@ func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerIn
259224
return nil, fmt.Errorf("listen discv5: %w", err)
260225
}
261226

227+
// evenly assign a libp2p hosts to crawler workers
228+
h := d.hosts[d.crawlerCount%len(d.hosts)]
229+
262230
c := &Crawler{
263231
id: fmt.Sprintf("crawler-%02d", d.crawlerCount),
264232
cfg: d.cfg.CrawlerConfig(),
265-
host: d.host.(*basichost.BasicHost),
233+
host: h.(*basichost.BasicHost),
266234
listener: listener,
267235
done: make(chan struct{}),
268236
}
@@ -293,7 +261,61 @@ func (d *CrawlDriver) Close() {
293261
c.listener.Close()
294262
}
295263

296-
if err := d.host.Close(); err != nil {
297-
log.WithError(err).Warnln("Failed closing libp2p host")
264+
for _, h := range d.hosts {
265+
if err := h.Close(); err != nil {
266+
log.WithError(err).WithField("localID", h.ID().String()).Warnln("Failed closing libp2p host")
267+
}
268+
}
269+
}
270+
271+
func newLibp2pHost(version string) (host.Host, error) {
272+
// Configure the resource manager to not limit anything
273+
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
274+
rm, err := rcmgr.NewResourceManager(limiter)
275+
if err != nil {
276+
return nil, fmt.Errorf("new resource manager: %w", err)
277+
}
278+
279+
ecdsaKey, err := ecdsa.GenerateKey(ethcrypto.S256(), crand.Reader)
280+
if err != nil {
281+
return nil, fmt.Errorf("generate secp256k1 key: %w", err)
282+
}
283+
284+
privBytes := elliptic.Marshal(ethcrypto.S256(), ecdsaKey.X, ecdsaKey.Y)
285+
secpKey := (*crypto.Secp256k1PrivateKey)(secp256k1v4.PrivKeyFromBytes(privBytes))
286+
287+
// Initialize a single libp2p node that's shared between all crawlers.
288+
// Context: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#network-fundamentals
289+
h, err := libp2p.New(
290+
libp2p.NoListenAddrs,
291+
libp2p.ResourceManager(rm),
292+
libp2p.Identity(secpKey),
293+
libp2p.Security(noise.ID, noise.New),
294+
libp2p.UserAgent("nebula/"+version),
295+
libp2p.Transport(tcp.NewTCPTransport),
296+
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
297+
libp2p.Muxer(yamux.ID, yamux.DefaultTransport),
298+
libp2p.DisableMetrics(),
299+
libp2p.ConnectionManager(connmgr.NullConnMgr{}),
300+
)
301+
if err != nil {
302+
return nil, fmt.Errorf("new libp2p host: %w", err)
298303
}
304+
305+
// According to Diva, these are required protocols. Some of them are just
306+
// assumed to be required. We just read from the stream indefinitely to
307+
// gain time for the identify exchange to finish. We just pretend to support
308+
// these protocols and keep the stream busy until we have gathered all the
309+
// information we were interested in. This includes the agend version and
310+
// all supported protocols.
311+
h.SetStreamHandler("/eth2/beacon_chain/req/ping/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
312+
h.SetStreamHandler("/eth2/beacon_chain/req/status/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
313+
h.SetStreamHandler("/eth2/beacon_chain/req/metadata/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
314+
h.SetStreamHandler("/eth2/beacon_chain/req/metadata/2/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
315+
h.SetStreamHandler("/eth2/beacon_chain/req/goodbye/1/ssz_snappy", func(s network.Stream) { io.ReadAll(s) })
316+
h.SetStreamHandler("/meshsub/1.1.0", func(s network.Stream) { io.ReadAll(s) }) // https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
317+
318+
log.WithField("peerID", h.ID().String()).Infoln("Started libp2p host")
319+
320+
return h, nil
299321
}

libp2p/driver_crawler.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package libp2p
22

33
import (
44
"fmt"
5+
"runtime"
56
"time"
67

78
"github.com/libp2p/go-libp2p"
89
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
10+
"github.com/libp2p/go-libp2p/core/connmgr"
11+
"github.com/libp2p/go-libp2p/core/host"
912
"github.com/libp2p/go-libp2p/core/peer"
1013
"github.com/libp2p/go-libp2p/core/protocol"
1114
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
@@ -81,7 +84,7 @@ func (cfg *CrawlDriverConfig) WriterConfig() *core.CrawlWriterConfig {
8184

8285
type CrawlDriver struct {
8386
cfg *CrawlDriverConfig
84-
host *basichost.BasicHost
87+
hosts []host.Host
8588
dbc db.Client
8689
dbCrawl *models.Crawl
8790
tasksChan chan PeerInfo
@@ -92,22 +95,13 @@ type CrawlDriver struct {
9295
var _ core.Driver[PeerInfo, core.CrawlResult[PeerInfo]] = (*CrawlDriver)(nil)
9396

9497
func NewCrawlDriver(dbc db.Client, dbCrawl *models.Crawl, cfg *CrawlDriverConfig) (*CrawlDriver, error) {
95-
// Configure the resource manager to not limit anything
96-
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
97-
rm, err := rcmgr.NewResourceManager(limiter)
98-
if err != nil {
99-
return nil, fmt.Errorf("new resource manager: %w", err)
100-
}
101-
102-
// Initialize a single libp2p node that's shared between all crawlers.
103-
h, err := libp2p.New(
104-
libp2p.NoListenAddrs,
105-
libp2p.ResourceManager(rm),
106-
libp2p.UserAgent("nebula/"+cfg.Version),
107-
libp2p.DisableMetrics(),
108-
)
109-
if err != nil {
110-
return nil, fmt.Errorf("new libp2p host: %w", err)
98+
hosts := make([]host.Host, 0, runtime.NumCPU())
99+
for i := 0; i < runtime.NumCPU(); i++ {
100+
h, err := newLibp2pHost(cfg.Version)
101+
if err != nil {
102+
return nil, fmt.Errorf("new libp2p host: %w", err)
103+
}
104+
hosts = append(hosts, h)
111105
}
112106

113107
peerAddrs := map[peer.ID][]ma.Multiaddr{}
@@ -146,7 +140,7 @@ func NewCrawlDriver(dbc db.Client, dbCrawl *models.Crawl, cfg *CrawlDriverConfig
146140

147141
return &CrawlDriver{
148142
cfg: cfg,
149-
host: h.(*basichost.BasicHost),
143+
hosts: hosts,
150144
dbc: dbc,
151145
dbCrawl: dbCrawl,
152146
tasksChan: tasksChan,
@@ -156,8 +150,10 @@ func NewCrawlDriver(dbc db.Client, dbCrawl *models.Crawl, cfg *CrawlDriverConfig
156150
}
157151

158152
func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerInfo]], error) {
153+
h := d.hosts[d.crawlerCount%len(d.hosts)]
154+
159155
ms := &msgSender{
160-
h: d.host,
156+
h: h,
161157
protocols: protocol.ConvertFromStrings(d.cfg.Protocols),
162158
timeout: d.cfg.DialTimeout,
163159
}
@@ -169,7 +165,7 @@ func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerIn
169165

170166
c := &Crawler{
171167
id: fmt.Sprintf("crawler-%02d", d.crawlerCount),
172-
host: d.host,
168+
host: h.(*basichost.BasicHost),
173169
pm: pm,
174170
cfg: d.cfg.CrawlerConfig(),
175171
client: kubo.NewClient(),
@@ -191,3 +187,21 @@ func (d *CrawlDriver) Tasks() <-chan PeerInfo {
191187
}
192188

193189
func (d *CrawlDriver) Close() {}
190+
191+
func newLibp2pHost(version string) (host.Host, error) {
192+
// Configure the resource manager to not limit anything
193+
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
194+
rm, err := rcmgr.NewResourceManager(limiter)
195+
if err != nil {
196+
return nil, fmt.Errorf("new resource manager: %w", err)
197+
}
198+
199+
// Initialize a single libp2p node that's shared between all crawlers.
200+
return libp2p.New(
201+
libp2p.NoListenAddrs,
202+
libp2p.ResourceManager(rm),
203+
libp2p.UserAgent("nebula/"+version),
204+
libp2p.ConnectionManager(connmgr.NullConnMgr{}),
205+
libp2p.DisableMetrics(),
206+
)
207+
}

0 commit comments

Comments
 (0)