Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.
85 changes: 75 additions & 10 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (

"github.com/ipfs/go-bitswap/internal/testutil"
blocks "github.com/ipfs/go-block-format"
protocol "github.com/libp2p/go-libp2p-core/protocol"

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/internal/session"
testinstance "github.com/ipfs/go-bitswap/internal/testinstance"
tn "github.com/ipfs/go-bitswap/internal/testnet"
bsnet "github.com/ipfs/go-bitswap/network"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
Expand Down Expand Up @@ -118,6 +120,74 @@ func BenchmarkFixedDelay(b *testing.B) {
printResults(benchmarkLog)
}

type mixedBench struct {
bench
fetcherCount int // number of nodes that fetch data
oldSeedCount int // number of seed nodes running old version of Bitswap
}

var mixedBenches = []mixedBench{
mixedBench{bench{"3Nodes-Overlap3-OneAtATime", 3, 10, overlap2, oneAtATime}, 1, 2},
mixedBench{bench{"3Nodes-AllToAll-OneAtATime", 3, 10, allToAll, oneAtATime}, 1, 2},
mixedBench{bench{"3Nodes-Overlap3-AllConcurrent", 3, 10, overlap2, fetchAllConcurrent}, 1, 2},
mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2},
}

func BenchmarkFetchFromOldBitswap(b *testing.B) {
benchmarkLog = nil
fixedDelay := delay.Fixed(10 * time.Millisecond)
bstoreLatency := time.Duration(0)

for _, bch := range mixedBenches {
b.Run(bch.name, func(b *testing.B) {
fetcherCount := bch.fetcherCount
oldSeedCount := bch.oldSeedCount
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)

// Simulate an older Bitswap node (old protocol ID) that doesn't
// send DONT_HAVE responses
oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne}
oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)}
oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)}
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts)

// Regular new Bitswap node
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil)
var instances []testinstance.Instance

// Create new nodes (fetchers + seeds)
for i := 0; i < fetcherCount+newSeedCount; i++ {
inst := newNodeGenerator.Next()
instances = append(instances, inst)
}
// Create old nodes (just seeds)
for i := 0; i < oldSeedCount; i++ {
inst := oldNodeGenerator.Next()
instances = append(instances, inst)
}
// Connect all the nodes together
testinstance.ConnectInstances(instances)

// Generate blocks, with a smaller root block
rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
blocks := testutil.GenerateBlocksOfSize(bch.blockCount, stdBlockSize)
blocks[0] = rootBlock[0]

// Run the distribution
runDistributionMulti(b, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn)

newNodeGenerator.Close()
oldNodeGenerator.Close()
})
}

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = ioutil.WriteFile("tmp/benchmark.json", out, 0666)
printResults(benchmarkLog)
}

const datacenterSpeed = 5 * time.Millisecond
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
Expand Down Expand Up @@ -226,12 +296,12 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

instances := ig.Instances(numnodes)
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
runDistributionMulti(b, instances, 3, blocks, bstoreLatency, df, ff)
runDistributionMulti(b, instances[:3], instances[3:], blocks, bstoreLatency, df, ff)
}
})

Expand All @@ -244,23 +314,22 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)

instances := ig.Instances(numnodes)
rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
blocks := testutil.GenerateBlocksOfSize(numblks, stdBlockSize)
blocks[0] = rootBlock[0]
runDistribution(b, instances, blocks, bstoreLatency, df, ff)
ig.Close()
// panic("done")
}
}

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

instances := ig.Instances(numnodes)
Expand All @@ -271,12 +340,8 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
}
}

func runDistributionMulti(b *testing.B, instances []testinstance.Instance, numFetchers int, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
numnodes := len(instances)
fetchers := instances[numnodes-numFetchers:]

func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
// Distribute blocks to seed nodes
seeds := instances[:numnodes-numFetchers]
df(b, seeds, blocks)

// Set the blockstore latency on seed nodes
Expand Down
27 changes: 23 additions & 4 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
decision "github.com/ipfs/go-bitswap/internal/decision"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/internal/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
bspm "github.com/ipfs/go-bitswap/internal/peermanager"
bspqm "github.com/ipfs/go-bitswap/internal/providerquerymanager"
Expand All @@ -25,6 +23,8 @@ import (
bssm "github.com/ipfs/go-bitswap/internal/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/internal/wantmanager"
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -84,6 +84,17 @@ func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
}
}

// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// This option is only used for testing.
func SetSendDontHaves(send bool) Option {
return func(bs *Bitswap) {
bs.engine.SetSendDontHaves(send)
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -111,14 +122,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil
})

var wm *bswm.WantManager
// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// and no response is received within a timeout.
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
// Simulate a DONT_HAVE message arriving to the WantManager
wm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network)
return bsmq.New(ctx, p, network, onDontHaveTimeout)
}

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm := bswm.New(ctx, pm, sim, bpm)
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
Expand Down
31 changes: 16 additions & 15 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

bitswap "github.com/ipfs/go-bitswap"
decision "github.com/ipfs/go-bitswap/internal/decision"
"github.com/ipfs/go-bitswap/message"
bssession "github.com/ipfs/go-bitswap/internal/session"
testinstance "github.com/ipfs/go-bitswap/internal/testinstance"
tn "github.com/ipfs/go-bitswap/internal/testnet"
"github.com/ipfs/go-bitswap/message"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
detectrace "github.com/ipfs/go-detect-race"
Expand All @@ -37,7 +37,7 @@ func getVirtualNetwork() tn.Network {

func TestClose(t *testing.T) {
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet)
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

Expand All @@ -55,7 +55,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

block := blocks.NewBlock([]byte("block"))
Expand All @@ -81,7 +81,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

peers := ig.Instances(2)
Expand Down Expand Up @@ -111,7 +111,8 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond))
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()

hasBlock := ig.Next()
Expand Down Expand Up @@ -148,7 +149,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
bsMessage := message.New(true)
bsMessage.AddBlock(block)

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

peers := ig.Instances(2)
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestPendingBlockAdded(t *testing.T) {
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4

ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

instance := ig.Instances(1)[0]
Expand Down Expand Up @@ -282,7 +283,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down Expand Up @@ -348,7 +349,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down Expand Up @@ -390,7 +391,7 @@ func TestSendToWantingPeer(t *testing.T) {

func TestEmptyKey(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bs := ig.Instances(1)[0].Exchange

Expand Down Expand Up @@ -423,7 +424,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6

func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down Expand Up @@ -499,7 +500,7 @@ func TestBasicBitswap(t *testing.T) {

func TestDoubleGet(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down Expand Up @@ -567,7 +568,7 @@ func TestDoubleGet(t *testing.T) {

func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down Expand Up @@ -689,7 +690,7 @@ func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {

func TestBitswapLedgerOneWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down Expand Up @@ -741,7 +742,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {

func TestBitswapLedgerTwoWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down
Loading