diff --git a/benchmarks_test.go b/benchmarks_test.go index e56214d9..71e04629 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -15,11 +15,13 @@ import ( "github.com/ipfs/go-bitswap/internal/testutil" blocks "github.com/ipfs/go-block-format" + protocol "github.com/libp2p/go-libp2p-core/protocol" bitswap "github.com/ipfs/go-bitswap" bssession "github.com/ipfs/go-bitswap/internal/session" testinstance "github.com/ipfs/go-bitswap/internal/testinstance" tn "github.com/ipfs/go-bitswap/internal/testnet" + bsnet "github.com/ipfs/go-bitswap/network" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" mockrouting "github.com/ipfs/go-ipfs-routing/mock" @@ -118,6 +120,74 @@ func BenchmarkFixedDelay(b *testing.B) { printResults(benchmarkLog) } +type mixedBench struct { + bench + fetcherCount int // number of nodes that fetch data + oldSeedCount int // number of seed nodes running old version of Bitswap +} + +var mixedBenches = []mixedBench{ + mixedBench{bench{"3Nodes-Overlap3-OneAtATime", 3, 10, overlap2, oneAtATime}, 1, 2}, + mixedBench{bench{"3Nodes-AllToAll-OneAtATime", 3, 10, allToAll, oneAtATime}, 1, 2}, + mixedBench{bench{"3Nodes-Overlap3-AllConcurrent", 3, 10, overlap2, fetchAllConcurrent}, 1, 2}, + mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2}, +} + +func BenchmarkFetchFromOldBitswap(b *testing.B) { + benchmarkLog = nil + fixedDelay := delay.Fixed(10 * time.Millisecond) + bstoreLatency := time.Duration(0) + + for _, bch := range mixedBenches { + b.Run(bch.name, func(b *testing.B) { + fetcherCount := bch.fetcherCount + oldSeedCount := bch.oldSeedCount + newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount) + + net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay) + + // Simulate an older Bitswap node (old protocol ID) that doesn't + // send DONT_HAVE responses + oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne} + oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)} + oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)} + oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts) + + // Regular new Bitswap node + newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil) + var instances []testinstance.Instance + + // Create new nodes (fetchers + seeds) + for i := 0; i < fetcherCount+newSeedCount; i++ { + inst := newNodeGenerator.Next() + instances = append(instances, inst) + } + // Create old nodes (just seeds) + for i := 0; i < oldSeedCount; i++ { + inst := oldNodeGenerator.Next() + instances = append(instances, inst) + } + // Connect all the nodes together + testinstance.ConnectInstances(instances) + + // Generate blocks, with a smaller root block + rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize) + blocks := testutil.GenerateBlocksOfSize(bch.blockCount, stdBlockSize) + blocks[0] = rootBlock[0] + + // Run the distribution + runDistributionMulti(b, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn) + + newNodeGenerator.Close() + oldNodeGenerator.Close() + }) + } + + out, _ := json.MarshalIndent(benchmarkLog, "", " ") + _ = ioutil.WriteFile("tmp/benchmark.json", out, 0666) + printResults(benchmarkLog) +} + const datacenterSpeed = 5 * time.Millisecond const fastSpeed = 60 * time.Millisecond const mediumSpeed = 200 * time.Millisecond @@ -226,12 +296,12 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { for i := 0; i < b.N; i++ { net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() instances := ig.Instances(numnodes) blocks := testutil.GenerateBlocksOfSize(numblks, blockSize) - runDistributionMulti(b, instances, 3, blocks, bstoreLatency, df, ff) + runDistributionMulti(b, instances[:3], instances[3:], blocks, bstoreLatency, df, ff) } }) @@ -244,7 +314,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b for i := 0; i < b.N; i++ { net := tn.VirtualNetwork(mockrouting.NewServer(), d) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := ig.Instances(numnodes) rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize) @@ -252,7 +322,6 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b blocks[0] = rootBlock[0] runDistribution(b, instances, blocks, bstoreLatency, df, ff) ig.Close() - // panic("done") } } @@ -260,7 +329,7 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d for i := 0; i < b.N; i++ { net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() instances := ig.Instances(numnodes) @@ -271,12 +340,8 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d } } -func runDistributionMulti(b *testing.B, instances []testinstance.Instance, numFetchers int, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { - numnodes := len(instances) - fetchers := instances[numnodes-numFetchers:] - +func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { // Distribute blocks to seed nodes - seeds := instances[:numnodes-numFetchers] df(b, seeds, blocks) // Set the blockstore latency on seed nodes diff --git a/bitswap.go b/bitswap.go index 2bc7a189..e5e0ef14 100644 --- a/bitswap.go +++ b/bitswap.go @@ -14,9 +14,7 @@ import ( bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" decision "github.com/ipfs/go-bitswap/internal/decision" bsgetter "github.com/ipfs/go-bitswap/internal/getter" - bsmsg "github.com/ipfs/go-bitswap/message" bsmq "github.com/ipfs/go-bitswap/internal/messagequeue" - bsnet "github.com/ipfs/go-bitswap/network" notifications "github.com/ipfs/go-bitswap/internal/notifications" bspm "github.com/ipfs/go-bitswap/internal/peermanager" bspqm "github.com/ipfs/go-bitswap/internal/providerquerymanager" @@ -25,6 +23,8 @@ import ( bssm "github.com/ipfs/go-bitswap/internal/sessionmanager" bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager" bswm "github.com/ipfs/go-bitswap/internal/wantmanager" + bsmsg "github.com/ipfs/go-bitswap/message" + bsnet "github.com/ipfs/go-bitswap/network" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -84,6 +84,17 @@ func RebroadcastDelay(newRebroadcastDelay delay.D) Option { } } +// SetSendDontHaves indicates what to do when the engine receives a want-block +// for a block that is not in the blockstore. Either +// - Send a DONT_HAVE message +// - Simply don't respond +// This option is only used for testing. +func SetSendDontHaves(send bool) Option { + return func(bs *Bitswap) { + bs.engine.SetSendDontHaves(send) + } +} + // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network // delegate. Runs until context is cancelled or bitswap.Close is called. @@ -111,14 +122,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, return nil }) + var wm *bswm.WantManager + // onDontHaveTimeout is called when a want-block is sent to a peer that + // has an old version of Bitswap that doesn't support DONT_HAVE messages, + // and no response is received within a timeout. + onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { + // Simulate a DONT_HAVE message arriving to the WantManager + wm.ReceiveFrom(ctx, p, nil, nil, dontHaves) + } peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, p, network) + return bsmq.New(ctx, p, network, onDontHaveTimeout) } sim := bssim.New() bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) - wm := bswm.New(ctx, pm, sim, bpm) + wm = bswm.New(ctx, pm, sim, bpm) pqm := bspqm.New(ctx, network) sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager, diff --git a/bitswap_test.go b/bitswap_test.go index 723b25d6..0a0bcc98 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -10,10 +10,10 @@ import ( bitswap "github.com/ipfs/go-bitswap" decision "github.com/ipfs/go-bitswap/internal/decision" - "github.com/ipfs/go-bitswap/message" bssession "github.com/ipfs/go-bitswap/internal/session" testinstance "github.com/ipfs/go-bitswap/internal/testinstance" tn "github.com/ipfs/go-bitswap/internal/testnet" + "github.com/ipfs/go-bitswap/message" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" detectrace "github.com/ipfs/go-detect-race" @@ -37,7 +37,7 @@ func getVirtualNetwork() tn.Network { func TestClose(t *testing.T) { vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -55,7 +55,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this rs := mockrouting.NewServer() net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() block := blocks.NewBlock([]byte("block")) @@ -81,7 +81,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -111,7 +111,8 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond)) + bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} + ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() hasBlock := ig.Next() @@ -148,7 +149,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { bsMessage := message.New(true) bsMessage.AddBlock(block) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -184,7 +185,7 @@ func TestPendingBlockAdded(t *testing.T) { bg := blocksutil.NewBlockGenerator() sessionBroadcastWantCapacity := 4 - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() instance := ig.Instances(1)[0] @@ -282,7 +283,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -348,7 +349,7 @@ func TestSendToWantingPeer(t *testing.T) { } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -390,7 +391,7 @@ func TestSendToWantingPeer(t *testing.T) { func TestEmptyKey(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bs := ig.Instances(1)[0].Exchange @@ -423,7 +424,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6 func TestBasicBitswap(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -499,7 +500,7 @@ func TestBasicBitswap(t *testing.T) { func TestDoubleGet(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -567,7 +568,7 @@ func TestDoubleGet(t *testing.T) { func TestWantlistCleanup(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -689,7 +690,7 @@ func newReceipt(sent, recv, exchanged uint64) *decision.Receipt { func TestBitswapLedgerOneWay(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -741,7 +742,7 @@ func TestBitswapLedgerOneWay(t *testing.T) { func TestBitswapLedgerTwoWay(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index 49e6d273..28d3a325 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -20,7 +20,7 @@ func TestBasicSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -71,7 +71,7 @@ func TestSessionBetweenPeers(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -127,7 +127,7 @@ func TestSessionSplitFetch(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -171,7 +171,7 @@ func TestFetchNotConnected(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, bitswap.ProviderSearchDelay(10*time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -216,7 +216,7 @@ func TestFetchAfterDisconnect(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, bitswap.ProviderSearchDelay(10*time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -290,7 +290,7 @@ func TestInterestCacheOverflow(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -342,7 +342,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -382,7 +382,7 @@ func TestMultipleSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -425,7 +425,7 @@ func TestWantlistClearsOnCancel(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 2e183b06..bf51beae 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -161,6 +161,8 @@ type Engine struct { // bytes up to which we will replace a want-have with a want-block maxBlockSizeReplaceHasWithBlock int + sendDontHaves bool + self peer.ID } @@ -180,6 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, ticker: time.NewTicker(time.Millisecond * 100), maxBlockSizeReplaceHasWithBlock: maxReplaceSize, taskWorkerCount: taskWorkerCount, + sendDontHaves: true, self: self, } e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String()) @@ -193,6 +196,16 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, return e } +// SetSendDontHaves indicates what to do when the engine receives a want-block +// for a block that is not in the blockstore. Either +// - Send a DONT_HAVE message +// - Simply don't respond +// Older versions of Bitswap did not respond, so this allows us to simulate +// those older versions for testing. +func (e *Engine) SetSendDontHaves(send bool) { + e.sendDontHaves = send +} + // Start up workers to handle requests from other nodes for the data on this node func (e *Engine) StartWorkers(ctx context.Context, px process.Process) { // Start up blockstore manager @@ -563,7 +576,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // If the block was not found if !found { // Only add the task to the queue if the requester wants a DONT_HAVE - if entry.SendDontHave { + if e.sendDontHaves && entry.SendDontHave { newWorkExists = true isWantBlock := false if entry.WantType == pb.Message_Wantlist_Block { diff --git a/internal/messagequeue/donthavetimeoutmgr.go b/internal/messagequeue/donthavetimeoutmgr.go new file mode 100644 index 00000000..ee7941b6 --- /dev/null +++ b/internal/messagequeue/donthavetimeoutmgr.go @@ -0,0 +1,304 @@ +package messagequeue + +import ( + "context" + "sync" + "time" + + cid "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +const ( + // dontHaveTimeout is used to simulate a DONT_HAVE when communicating with + // a peer whose Bitswap client doesn't support the DONT_HAVE response. + // If the peer doesn't respond to a want-block within the timeout, the + // local node assumes that the peer doesn't have the block. + dontHaveTimeout = 5 * time.Second + + // maxExpectedWantProcessTime is the maximum amount of time we expect a + // peer takes to process a want and initiate sending a response to us + maxExpectedWantProcessTime = 200 * time.Millisecond + + // latencyMultiplier is multiplied by the average ping time to + // get an upper bound on how long we expect to wait for a peer's response + // to arrive + latencyMultiplier = 2 +) + +// PeerConnection is a connection to a peer that can be pinged, and the +// average latency measured +type PeerConnection interface { + // Ping the peer + Ping(context.Context) ping.Result + // The average latency of all pings + Latency() time.Duration +} + +// pendingWant keeps track of a want that has been sent and we're waiting +// for a response or for a timeout to expire +type pendingWant struct { + c cid.Cid + active bool + sent time.Time +} + +// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to +// set a reasonable timeout for simulating a DONT_HAVE message for peers that +// don't support DONT_HAVE +type dontHaveTimeoutMgr struct { + ctx context.Context + shutdown func() + peerConn PeerConnection + onDontHaveTimeout func([]cid.Cid) + defaultTimeout time.Duration + latencyMultiplier int + maxExpectedWantProcessTime time.Duration + + // All variables below here must be protected by the lock + lk sync.RWMutex + // has the timeout manager started + started bool + // wants that are active (waiting for a response or timeout) + activeWants map[cid.Cid]*pendingWant + // queue of wants, from oldest to newest + wantQueue []*pendingWant + // time to wait for a response (depends on latency) + timeout time.Duration + // timer used to wait until want at front of queue expires + checkForTimeoutsTimer *time.Timer +} + +// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr +// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout) +func newDontHaveTimeoutMgr(ctx context.Context, pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr { + return newDontHaveTimeoutMgrWithParams(ctx, pc, onDontHaveTimeout, dontHaveTimeout, + latencyMultiplier, maxExpectedWantProcessTime) +} + +// newDontHaveTimeoutMgrWithParams is used by the tests +func newDontHaveTimeoutMgrWithParams(ctx context.Context, pc PeerConnection, onDontHaveTimeout func([]cid.Cid), + defaultTimeout time.Duration, latencyMultiplier int, + maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr { + + ctx, shutdown := context.WithCancel(ctx) + mqp := &dontHaveTimeoutMgr{ + ctx: ctx, + shutdown: shutdown, + peerConn: pc, + activeWants: make(map[cid.Cid]*pendingWant), + timeout: defaultTimeout, + defaultTimeout: defaultTimeout, + latencyMultiplier: latencyMultiplier, + maxExpectedWantProcessTime: maxExpectedWantProcessTime, + onDontHaveTimeout: onDontHaveTimeout, + } + + return mqp +} + +// Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored +func (dhtm *dontHaveTimeoutMgr) Shutdown() { + dhtm.shutdown() +} + +// onShutdown is called when the dontHaveTimeoutMgr shuts down +func (dhtm *dontHaveTimeoutMgr) onShutdown() { + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + // Clear any pending check for timeouts + if dhtm.checkForTimeoutsTimer != nil { + dhtm.checkForTimeoutsTimer.Stop() + } +} + +// closeAfterContext is called when the dontHaveTimeoutMgr starts. +// It monitors for the context being cancelled. +func (dhtm *dontHaveTimeoutMgr) closeAfterContext() { + <-dhtm.ctx.Done() + dhtm.onShutdown() +} + +// Start the dontHaveTimeoutMgr. This method is idempotent +func (dhtm *dontHaveTimeoutMgr) Start() { + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + // Make sure the dont have timeout manager hasn't already been started + if dhtm.started { + return + } + dhtm.started = true + + go dhtm.closeAfterContext() + + // If we already have a measure of latency to the peer, use it to + // calculate a reasonable timeout + latency := dhtm.peerConn.Latency() + if latency.Nanoseconds() > 0 { + dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency) + return + } + + // Otherwise measure latency by pinging the peer + go dhtm.measureLatency() +} + +// measureLatency measures the latency to the peer by pinging it +func (dhtm *dontHaveTimeoutMgr) measureLatency() { + // Wait up to defaultTimeout for a response to the ping + ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout) + defer cancel() + + // Ping the peer + res := dhtm.peerConn.Ping(ctx) + if res.Error != nil { + // If there was an error, we'll just leave the timeout as + // defaultTimeout + return + } + + // Get the average latency to the peer + latency := dhtm.peerConn.Latency() + + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + // Calculate a reasonable timeout based on latency + dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency) + + // Check if after changing the timeout there are any pending wants that are + // now over the timeout + dhtm.checkForTimeouts() +} + +// checkForTimeouts checks pending wants to see if any are over the timeout. +// Note: this function should only be called within the lock. +func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { + if len(dhtm.wantQueue) == 0 { + return + } + + // Figure out which of the blocks that were wanted were not received + // within the timeout + expired := make([]cid.Cid, 0, len(dhtm.activeWants)) + for len(dhtm.wantQueue) > 0 { + pw := dhtm.wantQueue[0] + + // If the want is still active + if pw.active { + // The queue is in order from earliest to latest, so if we + // didn't find an expired entry we can stop iterating + if time.Since(pw.sent) < dhtm.timeout { + break + } + + // Add the want to the expired list + expired = append(expired, pw.c) + // Remove the want from the activeWants map + delete(dhtm.activeWants, pw.c) + } + + // Remove expired or cancelled wants from the want queue + dhtm.wantQueue = dhtm.wantQueue[1:] + } + + // Fire the timeout event for the expired wants + if len(expired) > 0 { + go dhtm.fireTimeout(expired) + } + + if len(dhtm.wantQueue) == 0 { + return + } + + // Make sure the timeout manager is still running + if dhtm.ctx.Err() != nil { + return + } + + // Schedule the next check for the moment when the oldest pending want will + // timeout + oldestStart := dhtm.wantQueue[0].sent + until := time.Until(oldestStart.Add(dhtm.timeout)) + if dhtm.checkForTimeoutsTimer == nil { + dhtm.checkForTimeoutsTimer = time.AfterFunc(until, func() { + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + dhtm.checkForTimeouts() + }) + } else { + dhtm.checkForTimeoutsTimer.Stop() + dhtm.checkForTimeoutsTimer.Reset(until) + } +} + +// AddPending adds the given keys that will expire if not cancelled before +// the timeout +func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) { + if len(ks) == 0 { + return + } + + start := time.Now() + + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + queueWasEmpty := len(dhtm.activeWants) == 0 + + // Record the start time for each key + for _, c := range ks { + if _, ok := dhtm.activeWants[c]; !ok { + pw := pendingWant{ + c: c, + sent: start, + active: true, + } + dhtm.activeWants[c] = &pw + dhtm.wantQueue = append(dhtm.wantQueue, &pw) + } + } + + // If there was already an earlier pending item in the queue, then there + // must already be a timeout check scheduled. If there is nothing in the + // queue then we should make sure to schedule a check. + if queueWasEmpty { + dhtm.checkForTimeouts() + } +} + +// CancelPending is called when we receive a response for a key +func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) { + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + // Mark the wants as cancelled + for _, c := range ks { + if pw, ok := dhtm.activeWants[c]; ok { + pw.active = false + delete(dhtm.activeWants, c) + } + } +} + +// fireTimeout fires the onDontHaveTimeout method with the timed out keys +func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) { + // Make sure the timeout manager has not been shut down + if dhtm.ctx.Err() != nil { + return + } + + // Fire the timeout + dhtm.onDontHaveTimeout(pending) +} + +// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency +func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration { + // The maximum expected time for a response is + // the expected time to process the want + (latency * multiplier) + // The multiplier is to provide some padding for variable latency. + return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency +} diff --git a/internal/messagequeue/donthavetimeoutmgr_test.go b/internal/messagequeue/donthavetimeoutmgr_test.go new file mode 100644 index 00000000..78e622a7 --- /dev/null +++ b/internal/messagequeue/donthavetimeoutmgr_test.go @@ -0,0 +1,314 @@ +package messagequeue + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/ipfs/go-bitswap/internal/testutil" + cid "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +type mockPeerConn struct { + err error + latency time.Duration + latencies []time.Duration +} + +func (pc *mockPeerConn) Ping(ctx context.Context) ping.Result { + timer := time.NewTimer(pc.latency) + select { + case <-timer.C: + if pc.err != nil { + return ping.Result{Error: pc.err} + } + pc.latencies = append(pc.latencies, pc.latency) + case <-ctx.Done(): + } + return ping.Result{RTT: pc.latency} +} + +func (pc *mockPeerConn) Latency() time.Duration { + sum := time.Duration(0) + if len(pc.latencies) == 0 { + return sum + } + for _, l := range pc.latencies { + sum += l + } + return sum / time.Duration(len(pc.latencies)) +} + +type timeoutRecorder struct { + timedOutKs []cid.Cid + lk sync.Mutex +} + +func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) { + tr.lk.Lock() + defer tr.lk.Unlock() + tr.timedOutKs = append(tr.timedOutKs, tks...) +} + +func TestDontHaveTimeoutMgrTimeout(t *testing.T) { + firstks := testutil.GenerateCids(2) + secondks := append(firstks, testutil.GenerateCids(3)...) + latency := time.Millisecond * 10 + latMultiplier := 2 + expProcessTime := 5 * time.Millisecond + expectedTimeout := expProcessTime + latency*time.Duration(latMultiplier) + ctx := context.Background() + pc := &mockPeerConn{latency: latency} + tr := timeoutRecorder{} + + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout, + dontHaveTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add first set of keys + dhtm.AddPending(firstks) + + // Wait for less than the expected timeout + time.Sleep(expectedTimeout - 5*time.Millisecond) + + // At this stage no keys should have timed out + if len(tr.timedOutKs) > 0 { + t.Fatal("expected timeout not to have happened yet") + } + + // Add second set of keys + dhtm.AddPending(secondks) + + // Wait until after the expected timeout + time.Sleep(10 * time.Millisecond) + + // At this stage first set of keys should have timed out + if len(tr.timedOutKs) != len(firstks) { + t.Fatal("expected timeout") + } + + // Clear the recorded timed out keys + tr.timedOutKs = nil + + // Sleep until the second set of keys should have timed out + time.Sleep(expectedTimeout) + + // At this stage all keys should have timed out. The second set included + // the first set of keys, but they were added before the first set timed + // out, so only the remaining keys should have beed added. + if len(tr.timedOutKs) != len(secondks)-len(firstks) { + t.Fatal("expected second set of keys to timeout") + } +} + +func TestDontHaveTimeoutMgrCancel(t *testing.T) { + ks := testutil.GenerateCids(3) + latency := time.Millisecond * 10 + latMultiplier := 1 + expProcessTime := time.Duration(0) + expectedTimeout := latency + ctx := context.Background() + pc := &mockPeerConn{latency: latency} + tr := timeoutRecorder{} + + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout, + dontHaveTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add keys + dhtm.AddPending(ks) + time.Sleep(5 * time.Millisecond) + + // Cancel keys + cancelCount := 1 + dhtm.CancelPending(ks[:cancelCount]) + + // Wait for the expected timeout + time.Sleep(expectedTimeout) + + // At this stage all non-cancelled keys should have timed out + if len(tr.timedOutKs) != len(ks)-cancelCount { + t.Fatal("expected timeout") + } +} + +func TestDontHaveTimeoutWantCancelWant(t *testing.T) { + ks := testutil.GenerateCids(3) + latency := time.Millisecond * 20 + latMultiplier := 1 + expProcessTime := time.Duration(0) + expectedTimeout := latency + ctx := context.Background() + pc := &mockPeerConn{latency: latency} + tr := timeoutRecorder{} + + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout, + dontHaveTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add keys + dhtm.AddPending(ks) + + // Wait for a short time + time.Sleep(expectedTimeout - 10*time.Millisecond) + + // Cancel two keys + dhtm.CancelPending(ks[:2]) + + time.Sleep(5 * time.Millisecond) + + // Add back one cancelled key + dhtm.AddPending(ks[:1]) + + // Wait till after initial timeout + time.Sleep(10 * time.Millisecond) + + // At this stage only the key that was never cancelled should have timed out + if len(tr.timedOutKs) != 1 { + t.Fatal("expected one key to timeout") + } + + // Wait till after added back key should time out + time.Sleep(latency) + + // At this stage the key that was added back should also have timed out + if len(tr.timedOutKs) != 2 { + t.Fatal("expected added back key to timeout") + } +} + +func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { + ks := testutil.GenerateCids(10) + latency := time.Millisecond * 5 + latMultiplier := 1 + expProcessTime := time.Duration(0) + ctx := context.Background() + pc := &mockPeerConn{latency: latency} + tr := timeoutRecorder{} + + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout, + dontHaveTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add keys repeatedly + for _, c := range ks { + dhtm.AddPending([]cid.Cid{c}) + } + + // Wait for the expected timeout + time.Sleep(latency + 5*time.Millisecond) + + // At this stage all keys should have timed out + if len(tr.timedOutKs) != len(ks) { + t.Fatal("expected timeout") + } +} + +func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { + ks := testutil.GenerateCids(2) + latency := time.Millisecond * 1 + latMultiplier := 2 + expProcessTime := 2 * time.Millisecond + defaultTimeout := 10 * time.Millisecond + expectedTimeout := expProcessTime + defaultTimeout + tr := timeoutRecorder{} + ctx := context.Background() + pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")} + + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout, + defaultTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add keys + dhtm.AddPending(ks) + + // Sleep for less than the expected timeout + time.Sleep(expectedTimeout - 5*time.Millisecond) + + // At this stage no timeout should have happened yet + if len(tr.timedOutKs) > 0 { + t.Fatal("expected timeout not to have happened yet") + } + + // Sleep until after the expected timeout + time.Sleep(10 * time.Millisecond) + + // Now the keys should have timed out + if len(tr.timedOutKs) != len(ks) { + t.Fatal("expected timeout") + } +} + +func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { + ks := testutil.GenerateCids(2) + latency := time.Millisecond * 20 + latMultiplier := 1 + expProcessTime := time.Duration(0) + defaultTimeout := 10 * time.Millisecond + tr := timeoutRecorder{} + ctx := context.Background() + pc := &mockPeerConn{latency: latency} + + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout, + defaultTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add keys + dhtm.AddPending(ks) + + // Sleep for less than the default timeout + time.Sleep(defaultTimeout - 5*time.Millisecond) + + // At this stage no timeout should have happened yet + if len(tr.timedOutKs) > 0 { + t.Fatal("expected timeout not to have happened yet") + } + + // Sleep until after the default timeout + time.Sleep(10 * time.Millisecond) + + // Now the keys should have timed out + if len(tr.timedOutKs) != len(ks) { + t.Fatal("expected timeout") + } +} + +func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { + ks := testutil.GenerateCids(2) + latency := time.Millisecond * 10 + latMultiplier := 1 + expProcessTime := time.Duration(0) + ctx := context.Background() + pc := &mockPeerConn{latency: latency} + + var lk sync.Mutex + var timedOutKs []cid.Cid + onTimeout := func(tks []cid.Cid) { + lk.Lock() + defer lk.Unlock() + timedOutKs = append(timedOutKs, tks...) + } + dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, onTimeout, + dontHaveTimeout, latMultiplier, expProcessTime) + dhtm.Start() + + // Add keys + dhtm.AddPending(ks) + + // Wait less than the timeout + time.Sleep(latency - 5*time.Millisecond) + + // Shutdown the manager + dhtm.Shutdown() + + // Wait for the expected timeout + time.Sleep(10 * time.Millisecond) + + // Manager was shut down so timeout should not have fired + if len(timedOutKs) != 0 { + t.Fatal("expected no timeout after shutdown") + } +} diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index b8caad57..15f8100d 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -15,6 +15,7 @@ import ( cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) var log = logging.Logger("bitswap") @@ -40,7 +41,8 @@ const ( type MessageNetwork interface { ConnectTo(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) - Self() peer.ID + Latency(peer.ID) time.Duration + Ping(context.Context, peer.ID) ping.Result } // MessageQueue implements queue of want messages to send to peers. @@ -48,6 +50,7 @@ type MessageQueue struct { ctx context.Context p peer.ID network MessageNetwork + dhTimeoutMgr DontHaveTimeoutManager maxMessageSize int sendErrorBackoff time.Duration @@ -104,17 +107,60 @@ func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantTyp r.pending.RemoveType(c, wtype) } -// New creats a new MessageQueue. -func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue { - return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff) +type peerConn struct { + p peer.ID + network MessageNetwork +} + +func newPeerConnection(p peer.ID, network MessageNetwork) *peerConn { + return &peerConn{p, network} +} + +func (pc *peerConn) Ping(ctx context.Context) ping.Result { + return pc.network.Ping(ctx, pc.p) +} + +func (pc *peerConn) Latency() time.Duration { + return pc.network.Latency(pc.p) +} + +// Fires when a timeout occurs waiting for a response from a peer running an +// older version of Bitswap that doesn't support DONT_HAVE messages. +type OnDontHaveTimeout func(peer.ID, []cid.Cid) + +// DontHaveTimeoutManager pings a peer to estimate latency so it can set a reasonable +// upper bound on when to consider a DONT_HAVE request as timed out (when connected to +// a peer that doesn't support DONT_HAVE messages) +type DontHaveTimeoutManager interface { + // Start the manager (idempotent) + Start() + // Shutdown the manager (Shutdown is final, manager cannot be restarted) + Shutdown() + // AddPending adds the wants as pending a response. If the are not + // cancelled before the timeout, the OnDontHaveTimeout method will be called. + AddPending([]cid.Cid) + // CancelPending removes the wants + CancelPending([]cid.Cid) +} + +// New creates a new MessageQueue. +func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { + onTimeout := func(ks []cid.Cid) { + onDontHaveTimeout(p, ks) + } + dhTimeoutMgr := newDontHaveTimeoutMgr(ctx, newPeerConnection(p, network), onTimeout) + return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr) } // This constructor is used by the tests -func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, maxMsgSize int, sendErrorBackoff time.Duration) *MessageQueue { +func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, + maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { + mq := &MessageQueue{ ctx: ctx, p: p, network: network, + dhTimeoutMgr: dhTimeoutMgr, maxMessageSize: maxMsgSize, bcstWants: newRecallWantList(), peerWants: newRecallWantList(), @@ -191,9 +237,13 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { return } + // Cancel any outstanding DONT_HAVE timers + mq.dhTimeoutMgr.CancelPending(cancelKs) + mq.wllock.Lock() defer mq.wllock.Unlock() + // Remove keys from broadcast and peer wants, and add to cancels for _, c := range cancelKs { mq.bcstWants.Remove(c) mq.peerWants.Remove(c) @@ -227,7 +277,14 @@ func (mq *MessageQueue) Shutdown() { close(mq.done) } +func (mq *MessageQueue) onShutdown() { + // Shut down the DONT_HAVE timeout manager + mq.dhTimeoutMgr.Shutdown() +} + func (mq *MessageQueue) runQueue() { + defer mq.onShutdown() + for { select { case <-mq.rebroadcastTimer.C: @@ -301,6 +358,12 @@ func (mq *MessageQueue) sendMessage() { return } + // Make sure the DONT_HAVE timeout manager has started + if !mq.sender.SupportsHave() { + // Note: Start is idempotent + mq.dhTimeoutMgr.Start() + } + // Convert want lists to a Bitswap Message message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave()) if message == nil || message.Empty() { @@ -315,6 +378,8 @@ func (mq *MessageQueue) sendMessage() { // We were able to send successfully. onSent() + mq.simulateDontHaveWithTimeout(message) + // If the message was too big and only a subset of wants could be // sent, schedule sending the rest of the wants in the next // iteration of the event loop. @@ -327,6 +392,37 @@ func (mq *MessageQueue) sendMessage() { } } +// If the peer is running an older version of Bitswap that doesn't support the +// DONT_HAVE response, watch for timeouts on any want-blocks we sent the peer, +// and if there is a timeout simulate a DONT_HAVE response. +func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) { + // If the peer supports DONT_HAVE responses, we don't need to simulate + if mq.sender.SupportsHave() { + return + } + + mq.wllock.Lock() + + // Get the CID of each want-block that expects a DONT_HAVE response + wantlist := msg.Wantlist() + wants := make([]cid.Cid, 0, len(wantlist)) + for _, entry := range wantlist { + if entry.WantType == pb.Message_Wantlist_Block && entry.SendDontHave { + // Unlikely, but just in case check that the block hasn't been + // received in the interim + c := entry.Cid + if _, ok := mq.peerWants.allWants.Contains(c); ok { + wants = append(wants, c) + } + } + } + + mq.wllock.Unlock() + + // Add wants to DONT_HAVE timeout manager + mq.dhTimeoutMgr.AddPending(wants) +} + // func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) { // entries := msg.Wantlist() // for _, e := range entries { @@ -420,6 +516,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap return msg, onSent } + func (mq *MessageQueue) initializeSender() error { if mq.sender != nil { return nil diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index ad66c944..0ea93c43 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -3,17 +3,19 @@ package messagequeue import ( "context" "errors" + "fmt" "testing" "time" - "github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-bitswap/internal/testutil" + "github.com/ipfs/go-bitswap/message" cid "github.com/ipfs/go-cid" bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" bsnet "github.com/ipfs/go-bitswap/network" peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) type fakeMessageNetwork struct { @@ -33,7 +35,35 @@ func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet return nil, fmn.messageSenderError } -func (fms *fakeMessageNetwork) Self() peer.ID { return "" } +func (fms *fakeMessageNetwork) Self() peer.ID { return "" } +func (fms *fakeMessageNetwork) Latency(peer.ID) time.Duration { return 0 } +func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result { + return ping.Result{Error: fmt.Errorf("ping error")} +} + +type fakeDontHaveTimeoutMgr struct { + ks []cid.Cid +} + +func (fp *fakeDontHaveTimeoutMgr) Start() {} +func (fp *fakeDontHaveTimeoutMgr) Shutdown() {} +func (fp *fakeDontHaveTimeoutMgr) AddPending(ks []cid.Cid) { + s := cid.NewSet() + for _, c := range append(fp.ks, ks...) { + s.Add(c) + } + fp.ks = s.Keys() +} +func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) { + s := cid.NewSet() + for _, c := range fp.ks { + s.Add(c) + } + for _, c := range ks { + s.Remove(c) + } + fp.ks = s.Keys() +} type fakeMessageSender struct { sendError error @@ -56,6 +86,8 @@ func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{} func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil } func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave } +func mockTimeoutCb(peer.ID, []cid.Cid) {} + func collectMessages(ctx context.Context, t *testing.T, messagesSent <-chan bsmsg.BitSwapMessage, @@ -90,7 +122,7 @@ func TestStartupAndShutdown(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) bcstwh := testutil.GenerateCids(10) messageQueue.Startup() @@ -132,7 +164,7 @@ func TestSendingMessagesDeduped(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) wantHaves := testutil.GenerateCids(10) wantBlocks := testutil.GenerateCids(10) @@ -155,7 +187,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) wantHaves := testutil.GenerateCids(10) wantBlocks := testutil.GenerateCids(10) @@ -178,7 +210,7 @@ func TestSendingMessagesPriority(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) wantHaves1 := testutil.GenerateCids(5) wantHaves2 := testutil.GenerateCids(5) wantHaves := append(wantHaves1, wantHaves2...) @@ -247,7 +279,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) wantHaves := testutil.GenerateCids(2) wantBlocks := testutil.GenerateCids(2) @@ -281,7 +313,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) cancels := testutil.GenerateCids(3) messageQueue.Startup() @@ -314,7 +346,7 @@ func TestWantlistRebroadcast(t *testing.T) { fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) bcstwh := testutil.GenerateCids(10) wantHaves := testutil.GenerateCids(10) wantBlocks := testutil.GenerateCids(10) @@ -410,12 +442,13 @@ func TestSendingLargeMessages(t *testing.T) { fullClosedChan := make(chan struct{}, 1) fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] wantBlocks := testutil.GenerateCids(10) entrySize := 44 maxMsgSize := entrySize * 3 // 3 wants - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm) messageQueue.Startup() messageQueue.AddWants(wantBlocks, []cid.Cid{}) @@ -442,7 +475,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) messageQueue.Startup() // If the remote peer doesn't support HAVE / DONT_HAVE messages @@ -488,6 +521,39 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { } } +func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + sendErrors := make(chan error) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, false} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue.Startup() + + wbs := testutil.GenerateCids(10) + messageQueue.AddWants(wbs, nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Check want-blocks are added to DontHaveTimeoutMgr + if len(dhtm.ks) != len(wbs) { + t.Fatal("want-blocks not added to DontHaveTimeoutMgr") + } + + cancelCount := 2 + messageQueue.AddCancels(wbs[:cancelCount]) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Check want-blocks are removed from DontHaveTimeoutMgr + if len(dhtm.ks) != len(wbs)-cancelCount { + t.Fatal("want-blocks not removed from DontHaveTimeoutMgr") + } +} + func TestResendAfterError(t *testing.T) { ctx := context.Background() messagesSent := make(chan bsmsg.BitSwapMessage) @@ -496,9 +562,10 @@ func TestResendAfterError(t *testing.T) { fullClosedChan := make(chan struct{}, 1) fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] sendErrBackoff := 5 * time.Millisecond - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm) wantBlocks := testutil.GenerateCids(10) wantHaves := testutil.GenerateCids(10) @@ -534,9 +601,10 @@ func TestResendAfterMaxRetries(t *testing.T) { fullClosedChan := make(chan struct{}, 1) fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true} fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] sendErrBackoff := 2 * time.Millisecond - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm) wantBlocks := testutil.GenerateCids(10) wantHaves := testutil.GenerateCids(10) wantBlocks2 := testutil.GenerateCids(10) diff --git a/internal/testinstance/testinstance.go b/internal/testinstance/testinstance.go index 2068928d..b1651db1 100644 --- a/internal/testinstance/testinstance.go +++ b/internal/testinstance/testinstance.go @@ -5,8 +5,8 @@ import ( "time" bitswap "github.com/ipfs/go-bitswap" - bsnet "github.com/ipfs/go-bitswap/network" tn "github.com/ipfs/go-bitswap/internal/testnet" + bsnet "github.com/ipfs/go-bitswap/network" ds "github.com/ipfs/go-datastore" delayed "github.com/ipfs/go-datastore/delayed" ds_sync "github.com/ipfs/go-datastore/sync" @@ -19,24 +19,26 @@ import ( // NewTestInstanceGenerator generates a new InstanceGenerator for the given // testnet -func NewTestInstanceGenerator(net tn.Network, bsOptions ...bitswap.Option) InstanceGenerator { +func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { ctx, cancel := context.WithCancel(context.Background()) return InstanceGenerator{ - net: net, - seq: 0, - ctx: ctx, // TODO take ctx as param to Next, Instances - cancel: cancel, - bsOptions: bsOptions, + net: net, + seq: 0, + ctx: ctx, // TODO take ctx as param to Next, Instances + cancel: cancel, + bsOptions: bsOptions, + netOptions: netOptions, } } // InstanceGenerator generates new test instances of bitswap+dependencies type InstanceGenerator struct { - seq int - net tn.Network - ctx context.Context - cancel context.CancelFunc - bsOptions []bitswap.Option + seq int + net tn.Network + ctx context.Context + cancel context.CancelFunc + bsOptions []bitswap.Option + netOptions []bsnet.NetOpt } // Close closes the clobal context, shutting down all test instances @@ -52,7 +54,7 @@ func (g *InstanceGenerator) Next() Instance { if err != nil { panic("FIXME") // TODO change signature } - return NewInstance(g.ctx, g.net, p, g.bsOptions...) + return NewInstance(g.ctx, g.net, p, g.netOptions, g.bsOptions) } // Instances creates N test instances of bitswap + dependencies and connects @@ -63,6 +65,12 @@ func (g *InstanceGenerator) Instances(n int) []Instance { inst := g.Next() instances = append(instances, inst) } + ConnectInstances(instances) + return instances +} + +// ConnectInstances connects the given instances to each other +func ConnectInstances(instances []Instance) { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] @@ -72,7 +80,6 @@ func (g *InstanceGenerator) Instances(n int) []Instance { } } } - return instances } // Instance is a test instance of bitswap + dependencies for integration testing @@ -100,10 +107,10 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // instances. To safeguard, use the InstanceGenerator to generate instances. It's // just a much better idea. -func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, options ...bitswap.Option) Instance { +func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) Instance { bsdelay := delay.Fixed(0) - adapter := net.Adapter(p) + adapter := net.Adapter(p, netOptions...) dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore, err := blockstore.CachedBlockstore(ctx, @@ -113,7 +120,7 @@ func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, options . panic(err.Error()) // FIXME perhaps change signature and return error. } - bs := bitswap.New(ctx, adapter, bstore, options...).(*bitswap.Bitswap) + bs := bitswap.New(ctx, adapter, bstore, bsOptions...).(*bitswap.Bitswap) return Instance{ Adapter: adapter, diff --git a/internal/testnet/virtual.go b/internal/testnet/virtual.go index 9a92d1c7..1d1c7b79 100644 --- a/internal/testnet/virtual.go +++ b/internal/testnet/virtual.go @@ -17,9 +17,11 @@ import ( "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/peer" + protocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" tnet "github.com/libp2p/go-libp2p-testing/net" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) // VirtualNetwork generates a new testnet instance - a fake network that @@ -88,10 +90,23 @@ func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNe n.mu.Lock() defer n.mu.Unlock() + s := bsnet.Settings{ + SupportedProtocols: []protocol.ID{ + bsnet.ProtocolBitswap, + bsnet.ProtocolBitswapOneOne, + bsnet.ProtocolBitswapOneZero, + bsnet.ProtocolBitswapNoVers, + }, + } + for _, opt := range opts { + opt(&s) + } + client := &networkClient{ - local: p.ID(), - network: n, - routing: n.routingserver.Client(p), + local: p.ID(), + network: n, + routing: n.routingserver.Client(p), + supportedProtocols: s.SupportedProtocols, } n.clients[p.ID()] = &receiverQueue{receiver: client} return client @@ -169,15 +184,26 @@ func (n *network) SendMessage( type networkClient struct { local peer.ID bsnet.Receiver - network *network - routing routing.Routing - stats bsnet.Stats + network *network + routing routing.Routing + stats bsnet.Stats + supportedProtocols []protocol.ID } func (nc *networkClient) Self() peer.ID { return nc.local } +func (nc *networkClient) Ping(ctx context.Context, p peer.ID) ping.Result { + return ping.Result{RTT: nc.Latency(p)} +} + +func (nc *networkClient) Latency(p peer.ID) time.Duration { + nc.network.mu.Lock() + defer nc.network.mu.Unlock() + return nc.network.latencies[nc.local][p] +} + func (nc *networkClient) SendMessage( ctx context.Context, to peer.ID, @@ -240,8 +266,20 @@ func (mp *messagePasser) Reset() error { return nil } +var oldProtos = map[protocol.ID]struct{}{ + bsnet.ProtocolBitswapNoVers: struct{}{}, + bsnet.ProtocolBitswapOneZero: struct{}{}, + bsnet.ProtocolBitswapOneOne: struct{}{}, +} + func (mp *messagePasser) SupportsHave() bool { - return true + protos := mp.net.network.clients[mp.target].receiver.supportedProtocols + for _, proto := range protos { + if _, ok := oldProtos[proto]; !ok { + return true + } + } + return false } func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { diff --git a/network/interface.go b/network/interface.go index 704d851f..6b2878e3 100644 --- a/network/interface.go +++ b/network/interface.go @@ -2,6 +2,7 @@ package network import ( "context" + "time" bsmsg "github.com/ipfs/go-bitswap/message" @@ -10,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) var ( @@ -26,6 +28,7 @@ var ( // BitSwapNetwork provides network connectivity for BitSwap sessions. type BitSwapNetwork interface { Self() peer.ID + // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, @@ -46,6 +49,8 @@ type BitSwapNetwork interface { Stats() Stats Routing + + Pinger } // MessageSender is an interface for sending a series of messages over the bitswap @@ -82,6 +87,14 @@ type Routing interface { Provide(context.Context, cid.Cid) error } +// Pinger is an interface to ping a peer and get the average latency of all pings +type Pinger interface { + // Ping a peer + Ping(context.Context, peer.ID) ping.Result + // Get the average latency of all pings + Latency(peer.ID) time.Duration +} + // Stats is a container for statistics about the bitswap network // the numbers inside are specific to bitswap, and not any other protocols // using the same underlying network. diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 2a25b7a0..b73a2545 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -19,6 +19,7 @@ import ( peerstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" msgio "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" ) @@ -107,6 +108,17 @@ func (bsnet *impl) Self() peer.ID { return bsnet.host.ID() } +func (bsnet *impl) Ping(ctx context.Context, p peer.ID) ping.Result { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + res := <-ping.Ping(ctx, bsnet.host, p) + return res +} + +func (bsnet *impl) Latency(p peer.ID) time.Duration { + return bsnet.host.Peerstore().LatencyEWMA(p) +} + // Indicates whether the given protocol supports HAVE / DONT_HAVE messages func (bsnet *impl) SupportsHave(proto protocol.ID) bool { switch proto {