benchmarks_test.go 21.3 KB
Newer Older
1
package bitswap_test
2 3 4 5

import (
	"context"
	"encoding/json"
dirkmc's avatar
dirkmc committed
6
	"fmt"
7
	"io/ioutil"
dirkmc's avatar
dirkmc committed
8
	"math"
9
	"math/rand"
10 11
	"os"
	"strconv"
12 13 14 15
	"sync"
	"testing"
	"time"

16
	"github.com/ipfs/go-bitswap/internal/testutil"
17
	blocks "github.com/ipfs/go-block-format"
18
	protocol "github.com/libp2p/go-libp2p-core/protocol"
19

20
	bitswap "github.com/ipfs/go-bitswap"
21 22 23
	bssession "github.com/ipfs/go-bitswap/internal/session"
	testinstance "github.com/ipfs/go-bitswap/internal/testinstance"
	tn "github.com/ipfs/go-bitswap/internal/testnet"
24
	bsnet "github.com/ipfs/go-bitswap/network"
25 26 27 28 29
	cid "github.com/ipfs/go-cid"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

30
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
31

32
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
33 34

type runStats struct {
dirkmc's avatar
dirkmc committed
35 36 37 38 39 40
	DupsRcvd uint64
	BlksRcvd uint64
	MsgSent  uint64
	MsgRecd  uint64
	Time     time.Duration
	Name     string
41 42 43 44
}

var benchmarkLog []runStats

dirkmc's avatar
dirkmc committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
type bench struct {
	name       string
	nodeCount  int
	blockCount int
	distFn     distFunc
	fetchFn    fetchFunc
}

var benches = []bench{
	// Fetch from two seed nodes that both have all 100 blocks
	// - request one at a time, in series
	bench{"3Nodes-AllToAll-OneAtATime", 3, 100, allToAll, oneAtATime},
	// - request all 100 with a single GetBlocks() call
	bench{"3Nodes-AllToAll-BigBatch", 3, 100, allToAll, batchFetchAll},

	// Fetch from two seed nodes, one at a time, where:
	// - node A has blocks 0 - 74
	// - node B has blocks 25 - 99
	bench{"3Nodes-Overlap1-OneAtATime", 3, 100, overlap1, oneAtATime},

	// Fetch from two seed nodes, where:
	// - node A has even blocks
	// - node B has odd blocks
	// - both nodes have every third block

	// - request one at a time, in series
	bench{"3Nodes-Overlap3-OneAtATime", 3, 100, overlap2, oneAtATime},
	// - request 10 at a time, in series
	bench{"3Nodes-Overlap3-BatchBy10", 3, 100, overlap2, batchFetchBy10},
	// - request all 100 in parallel as individual GetBlock() calls
	bench{"3Nodes-Overlap3-AllConcurrent", 3, 100, overlap2, fetchAllConcurrent},
	// - request all 100 with a single GetBlocks() call
	bench{"3Nodes-Overlap3-BigBatch", 3, 100, overlap2, batchFetchAll},
	// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
	bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch},

	// Fetch from nine seed nodes, all nodes have all blocks
	// - request one at a time, in series
	bench{"10Nodes-AllToAll-OneAtATime", 10, 100, allToAll, oneAtATime},
	// - request 10 at a time, in series
	bench{"10Nodes-AllToAll-BatchFetchBy10", 10, 100, allToAll, batchFetchBy10},
	// - request all 100 with a single GetBlocks() call
	bench{"10Nodes-AllToAll-BigBatch", 10, 100, allToAll, batchFetchAll},
	// - request all 100 in parallel as individual GetBlock() calls
	bench{"10Nodes-AllToAll-AllConcurrent", 10, 100, allToAll, fetchAllConcurrent},
	// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
	bench{"10Nodes-AllToAll-UnixfsFetch", 10, 100, allToAll, unixfsFileFetch},
	// - follow a typical IPFS request pattern for 1000 blocks
	bench{"10Nodes-AllToAll-UnixfsFetchLarge", 10, 1000, allToAll, unixfsFileFetchLarge},

	// Fetch from nine seed nodes, blocks are distributed randomly across all nodes (no dups)
	// - request one at a time, in series
	bench{"10Nodes-OnePeerPerBlock-OneAtATime", 10, 100, onePeerPerBlock, oneAtATime},
	// - request all 100 with a single GetBlocks() call
	bench{"10Nodes-OnePeerPerBlock-BigBatch", 10, 100, onePeerPerBlock, batchFetchAll},
	// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
	bench{"10Nodes-OnePeerPerBlock-UnixfsFetch", 10, 100, onePeerPerBlock, unixfsFileFetch},

	// Fetch from 199 seed nodes, all nodes have all blocks, fetch all 20 blocks with a single GetBlocks() call
	bench{"200Nodes-AllToAll-BigBatch", 200, 20, allToAll, batchFetchAll},
}

func BenchmarkFixedDelay(b *testing.B) {
108
	benchmarkLog = nil
109
	fixedDelay := delay.Fixed(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
110
	bstoreLatency := time.Duration(0)
111

dirkmc's avatar
dirkmc committed
112 113 114 115 116
	for _, bch := range benches {
		b.Run(bch.name, func(b *testing.B) {
			subtestDistributeAndFetch(b, bch.nodeCount, bch.blockCount, fixedDelay, bstoreLatency, bch.distFn, bch.fetchFn)
		})
	}
117 118

	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
Steven Allen's avatar
Steven Allen committed
119
	_ = ioutil.WriteFile("tmp/benchmark.json", out, 0666)
dirkmc's avatar
dirkmc committed
120
	printResults(benchmarkLog)
121 122
}

123 124 125 126 127 128 129 130 131 132
type mixedBench struct {
	bench
	fetcherCount int // number of nodes that fetch data
	oldSeedCount int // number of seed nodes running old version of Bitswap
}

var mixedBenches = []mixedBench{
	mixedBench{bench{"3Nodes-Overlap3-OneAtATime", 3, 10, overlap2, oneAtATime}, 1, 2},
	mixedBench{bench{"3Nodes-AllToAll-OneAtATime", 3, 10, allToAll, oneAtATime}, 1, 2},
	mixedBench{bench{"3Nodes-Overlap3-AllConcurrent", 3, 10, overlap2, fetchAllConcurrent}, 1, 2},
Dirk McCormick's avatar
Dirk McCormick committed
133
	// mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2},
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
}

func BenchmarkFetchFromOldBitswap(b *testing.B) {
	benchmarkLog = nil
	fixedDelay := delay.Fixed(10 * time.Millisecond)
	bstoreLatency := time.Duration(0)

	for _, bch := range mixedBenches {
		b.Run(bch.name, func(b *testing.B) {
			fetcherCount := bch.fetcherCount
			oldSeedCount := bch.oldSeedCount
			newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

			net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)

			// Simulate an older Bitswap node (old protocol ID) that doesn't
			// send DONT_HAVE responses
			oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne}
			oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)}
			oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)}
			oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts)

			// Regular new Bitswap node
			newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil)
			var instances []testinstance.Instance

			// Create new nodes (fetchers + seeds)
			for i := 0; i < fetcherCount+newSeedCount; i++ {
				inst := newNodeGenerator.Next()
				instances = append(instances, inst)
			}
			// Create old nodes (just seeds)
			for i := 0; i < oldSeedCount; i++ {
				inst := oldNodeGenerator.Next()
				instances = append(instances, inst)
			}
			// Connect all the nodes together
			testinstance.ConnectInstances(instances)

			// Generate blocks, with a smaller root block
			rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
			blocks := testutil.GenerateBlocksOfSize(bch.blockCount, stdBlockSize)
			blocks[0] = rootBlock[0]

			// Run the distribution
			runDistributionMulti(b, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn)

			newNodeGenerator.Close()
			oldNodeGenerator.Close()
		})
	}

	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	_ = ioutil.WriteFile("tmp/benchmark.json", out, 0666)
	printResults(benchmarkLog)
}

dirkmc's avatar
dirkmc committed
191
const datacenterSpeed = 5 * time.Millisecond
192 193 194 195
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
dirkmc's avatar
dirkmc committed
196
const datacenterDistribution = 3 * time.Millisecond
197
const distribution = 20 * time.Millisecond
dirkmc's avatar
dirkmc committed
198 199
const datacenterBandwidth = 125000000.0
const datacenterBandwidthDeviation = 3000000.0
200 201 202 203 204 205
const fastBandwidth = 1250000.0
const fastBandwidthDeviation = 300000.0
const mediumBandwidth = 500000.0
const mediumBandwidthDeviation = 80000.0
const slowBandwidth = 100000.0
const slowBandwidthDeviation = 16500.0
dirkmc's avatar
dirkmc committed
206
const rootBlockSize = 800
207
const stdBlockSize = 8000
dirkmc's avatar
dirkmc committed
208
const largeBlockSize = int64(256 * 1024)
209

dirkmc's avatar
dirkmc committed
210
func BenchmarkRealWorld(b *testing.B) {
211
	benchmarkLog = nil
212 213 214 215 216 217
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

218 219
	fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
220
		0.0, 0.0, distribution, randomGen)
221
	fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
222
	fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, randomGen)
223 224
	averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
225
		0.3, 0.3, distribution, randomGen)
226
	averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
227
	averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, randomGen)
228 229
	slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
230
		0.3, 0.3, distribution, randomGen)
231
	slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
232
	slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, randomGen)
dirkmc's avatar
dirkmc committed
233
	bstoreLatency := time.Duration(0)
234 235

	b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
dirkmc's avatar
dirkmc committed
236
		subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
237 238
	})
	b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
dirkmc's avatar
dirkmc committed
239
		subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
240 241
	})
	b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
dirkmc's avatar
dirkmc committed
242
		subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
243
	})
244
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
Steven Allen's avatar
Steven Allen committed
245
	_ = ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
dirkmc's avatar
dirkmc committed
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
	printResults(benchmarkLog)
}

func BenchmarkDatacenter(b *testing.B) {
	benchmarkLog = nil
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

	datacenterNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		fastSpeed-datacenterSpeed, (fastSpeed-datacenterSpeed)/2,
		0.0, 0.0, datacenterDistribution, randomGen)
	datacenterNetworkDelay := delay.Delay(datacenterSpeed, datacenterNetworkDelayGenerator)
	datacenterBandwidthGenerator := tn.VariableRateLimitGenerator(datacenterBandwidth, datacenterBandwidthDeviation, randomGen)
	bstoreLatency := time.Millisecond * 25

	b.Run("3Nodes-Overlap3-UnixfsFetch", func(b *testing.B) {
		subtestDistributeAndFetchRateLimited(b, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch)
	})
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	_ = ioutil.WriteFile("tmp/rb-benchmark.json", out, 0666)
	printResults(benchmarkLog)
}

func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
	benchmarkLog = nil
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

	datacenterNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		fastSpeed-datacenterSpeed, (fastSpeed-datacenterSpeed)/2,
		0.0, 0.0, datacenterDistribution, randomGen)
	datacenterNetworkDelay := delay.Delay(datacenterSpeed, datacenterNetworkDelayGenerator)
	datacenterBandwidthGenerator := tn.VariableRateLimitGenerator(datacenterBandwidth, datacenterBandwidthDeviation, randomGen)
	bstoreLatency := time.Millisecond * 25

	b.Run("3Leech3Seed-AllToAll-UnixfsFetch", func(b *testing.B) {
		d := datacenterNetworkDelay
		rateLimitGenerator := datacenterBandwidthGenerator
		blockSize := largeBlockSize
		df := allToAll
		ff := unixfsFileFetchLarge
		numnodes := 6
		numblks := 1000

		for i := 0; i < b.N; i++ {
			net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

299
			ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
dirkmc's avatar
dirkmc committed
300 301 302 303
			defer ig.Close()

			instances := ig.Instances(numnodes)
			blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
304
			runDistributionMulti(b, instances[:3], instances[3:], blocks, bstoreLatency, df, ff)
dirkmc's avatar
dirkmc committed
305 306 307 308 309 310
		}
	})

	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	_ = ioutil.WriteFile("tmp/rb-benchmark.json", out, 0666)
	printResults(benchmarkLog)
311 312
}

dirkmc's avatar
dirkmc committed
313
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
314 315
	for i := 0; i < b.N; i++ {
		net := tn.VirtualNetwork(mockrouting.NewServer(), d)
316

317
		ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
318

319
		instances := ig.Instances(numnodes)
dirkmc's avatar
dirkmc committed
320 321 322 323 324
		rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
		blocks := testutil.GenerateBlocksOfSize(numblks, stdBlockSize)
		blocks[0] = rootBlock[0]
		runDistribution(b, instances, blocks, bstoreLatency, df, ff)
		ig.Close()
325
	}
326 327
}

dirkmc's avatar
dirkmc committed
328
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
329 330
	for i := 0; i < b.N; i++ {
		net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
331

332
		ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
333
		defer ig.Close()
334

335
		instances := ig.Instances(numnodes)
dirkmc's avatar
dirkmc committed
336
		rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
337
		blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
dirkmc's avatar
dirkmc committed
338 339
		blocks[0] = rootBlock[0]
		runDistribution(b, instances, blocks, bstoreLatency, df, ff)
340
	}
341 342
}

343
func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
dirkmc's avatar
dirkmc committed
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
	// Distribute blocks to seed nodes
	df(b, seeds, blocks)

	// Set the blockstore latency on seed nodes
	if bstoreLatency > 0 {
		for _, i := range seeds {
			i.SetBlockstoreLatency(bstoreLatency)
		}
	}

	// Fetch blocks (from seed nodes to leech nodes)
	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

	start := time.Now()
	var wg sync.WaitGroup
	for _, fetcher := range fetchers {
		wg.Add(1)

		go func(ftchr testinstance.Instance) {
			defer wg.Done()

			ff(b, ftchr.Exchange, ks)
		}(fetcher)
	}
	wg.Wait()

	// Collect statistics
	fetcher := fetchers[0]
	st, err := fetcher.Exchange.Stat()
	if err != nil {
		b.Fatal(err)
	}

	for _, fetcher := range fetchers {
		nst := fetcher.Adapter.Stats()
		stats := runStats{
			Time:     time.Since(start),
			MsgRecd:  nst.MessagesRecvd,
			MsgSent:  nst.MessagesSent,
			DupsRcvd: st.DupBlksReceived,
			BlksRcvd: st.BlocksReceived,
			Name:     b.Name(),
		}
		benchmarkLog = append(benchmarkLog, stats)
	}
	// b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived)
}
394

dirkmc's avatar
dirkmc committed
395 396
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
	numnodes := len(instances)
397 398
	fetcher := instances[numnodes-1]

dirkmc's avatar
dirkmc committed
399 400 401
	// Distribute blocks to seed nodes
	seeds := instances[:numnodes-1]
	df(b, seeds, blocks)
402

dirkmc's avatar
dirkmc committed
403 404 405 406 407 408 409 410
	// Set the blockstore latency on seed nodes
	if bstoreLatency > 0 {
		for _, i := range seeds {
			i.SetBlockstoreLatency(bstoreLatency)
		}
	}

	// Fetch blocks (from seed nodes to leech nodes)
411 412 413 414 415
	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

dirkmc's avatar
dirkmc committed
416
	start := time.Now()
417
	ff(b, fetcher.Exchange, ks)
418

dirkmc's avatar
dirkmc committed
419
	// Collect statistics
420 421
	st, err := fetcher.Exchange.Stat()
	if err != nil {
422
		b.Fatal(err)
423 424
	}

425
	nst := fetcher.Adapter.Stats()
426
	stats := runStats{
dirkmc's avatar
dirkmc committed
427 428 429 430 431 432
		Time:     time.Since(start),
		MsgRecd:  nst.MessagesRecvd,
		MsgSent:  nst.MessagesSent,
		DupsRcvd: st.DupBlksReceived,
		BlksRcvd: st.BlocksReceived,
		Name:     b.Name(),
433 434
	}
	benchmarkLog = append(benchmarkLog, stats)
dirkmc's avatar
dirkmc committed
435
	// b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived)
436 437
}

438
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
439 440
	for _, p := range provs {
		if err := p.Blockstore().PutMany(blocks); err != nil {
441
			b.Fatal(err)
442 443 444 445 446 447
		}
	}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
448
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
449
	if len(provs) != 2 {
450
		b.Fatal("overlap1 only works with 2 provs")
451 452 453 454 455
	}
	bill := provs[0]
	jeff := provs[1]

	if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
456
		b.Fatal(err)
457 458
	}
	if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
459
		b.Fatal(err)
460 461 462 463 464
	}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second.  it also gives every third block to both peers
465
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
466
	if len(provs) != 2 {
467
		b.Fatal("overlap2 only works with 2 provs")
468 469 470 471 472
	}
	bill := provs[0]
	jeff := provs[1]

	for i, blk := range blks {
473 474 475 476 477 478
		even := i%2 == 0
		third := i%3 == 0
		if third || even {
			if err := bill.Blockstore().Put(blk); err != nil {
				b.Fatal(err)
			}
479
		}
480 481 482 483
		if third || !even {
			if err := jeff.Blockstore().Put(blk); err != nil {
				b.Fatal(err)
			}
484 485 486 487 488 489 490
		}
	}
}

// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
491
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
492
	for _, blk := range blks {
Steven Allen's avatar
Steven Allen committed
493 494 495 496
		err := provs[rand.Intn(len(provs))].Blockstore().Put(blk)
		if err != nil {
			b.Fatal(err)
		}
497 498 499
	}
}

500
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
501
	ses := bs.NewSession(context.Background()).(*bssession.Session)
502 503 504
	for _, c := range ks {
		_, err := ses.GetBlock(context.Background(), c)
		if err != nil {
505
			b.Fatal(err)
506 507
		}
	}
dirkmc's avatar
dirkmc committed
508
	// b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
509 510 511
}

// fetch data in batches, 10 at a time
512
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
513 514 515 516
	ses := bs.NewSession(context.Background())
	for i := 0; i < len(ks); i += 10 {
		out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
		if err != nil {
517
			b.Fatal(err)
518 519 520 521 522 523 524
		}
		for range out {
		}
	}
}

// fetch each block at the same time concurrently
525
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
526 527 528 529 530 531 532 533 534
	ses := bs.NewSession(context.Background())

	var wg sync.WaitGroup
	for _, c := range ks {
		wg.Add(1)
		go func(c cid.Cid) {
			defer wg.Done()
			_, err := ses.GetBlock(context.Background(), c)
			if err != nil {
Steven Allen's avatar
Steven Allen committed
535
				b.Error(err)
536 537 538 539 540 541
			}
		}(c)
	}
	wg.Wait()
}

542
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
543 544 545
	ses := bs.NewSession(context.Background())
	out, err := ses.GetBlocks(context.Background(), ks)
	if err != nil {
546
		b.Fatal(err)
547 548 549 550 551 552
	}
	for range out {
	}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
553
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
554 555 556
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
557
		b.Fatal(err)
558 559 560 561
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
562
		b.Fatal(err)
563 564 565 566 567 568
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:])
	if err != nil {
569
		b.Fatal(err)
570 571 572 573
	}
	for range out {
	}
}
dirkmc's avatar
dirkmc committed
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681

func unixfsFileFetchLarge(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
		b.Fatal(err)
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
		b.Fatal(err)
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:100])
	if err != nil {
		b.Fatal(err)
	}
	for range out {
	}

	rest := ks[100:]
	for len(rest) > 0 {
		var batch [][]cid.Cid
		for i := 0; i < 5 && len(rest) > 0; i++ {
			cnt := 10
			if len(rest) < 10 {
				cnt = len(rest)
			}
			group := rest[:cnt]
			rest = rest[cnt:]
			batch = append(batch, group)
		}

		var anyErr error
		var wg sync.WaitGroup
		for _, group := range batch {
			wg.Add(1)
			go func(grp []cid.Cid) {
				defer wg.Done()

				out, err = ses.GetBlocks(context.Background(), grp)
				if err != nil {
					anyErr = err
				}
				for range out {
				}
			}(group)
		}
		wg.Wait()

		// Note: b.Fatal() cannot be called from within a go-routine
		if anyErr != nil {
			b.Fatal(anyErr)
		}
	}
}

func printResults(rs []runStats) {
	nameOrder := make([]string, 0)
	names := make(map[string]struct{})
	for i := 0; i < len(rs); i++ {
		if _, ok := names[rs[i].Name]; !ok {
			nameOrder = append(nameOrder, rs[i].Name)
			names[rs[i].Name] = struct{}{}
		}
	}

	for i := 0; i < len(names); i++ {
		name := nameOrder[i]
		count := 0
		sent := 0.0
		rcvd := 0.0
		dups := 0.0
		blks := 0.0
		elpd := 0.0
		for i := 0; i < len(rs); i++ {
			if rs[i].Name == name {
				count++
				sent += float64(rs[i].MsgSent)
				rcvd += float64(rs[i].MsgRecd)
				dups += float64(rs[i].DupsRcvd)
				blks += float64(rs[i].BlksRcvd)
				elpd += float64(rs[i].Time)
			}
		}
		sent /= float64(count)
		rcvd /= float64(count)
		dups /= float64(count)
		blks /= float64(count)

		label := fmt.Sprintf("%s (%d runs / %.2fs):", name, count, elpd/1000000000.0)
		fmt.Printf("%-75s %s: sent %d, recv %d, dups %d / %d\n",
			label,
			fmtDuration(time.Duration(int64(math.Round(elpd/float64(count))))),
			int64(math.Round(sent)), int64(math.Round(rcvd)),
			int64(math.Round(dups)), int64(math.Round(blks)))
	}
}

func fmtDuration(d time.Duration) string {
	d = d.Round(time.Millisecond)
	s := d / time.Second
	d -= s * time.Second
	ms := d / time.Millisecond
	return fmt.Sprintf("%d.%03ds", s, ms)
}