benchmarks_test.go 18.8 KB
Newer Older
1
package bitswap_test
2 3 4 5

import (
	"context"
	"encoding/json"
dirkmc's avatar
dirkmc committed
6
	"fmt"
7
	"io/ioutil"
dirkmc's avatar
dirkmc committed
8
	"math"
9
	"math/rand"
10 11
	"os"
	"strconv"
12 13 14 15
	"sync"
	"testing"
	"time"

16
	"github.com/ipfs/go-bitswap/testutil"
17
	blocks "github.com/ipfs/go-block-format"
18

19
	bitswap "github.com/ipfs/go-bitswap"
20
	bssession "github.com/ipfs/go-bitswap/session"
21
	testinstance "github.com/ipfs/go-bitswap/testinstance"
22
	tn "github.com/ipfs/go-bitswap/testnet"
23 24 25 26 27
	cid "github.com/ipfs/go-cid"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

28
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
29

30
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
31 32

type runStats struct {
dirkmc's avatar
dirkmc committed
33 34 35 36 37 38
	DupsRcvd uint64
	BlksRcvd uint64
	MsgSent  uint64
	MsgRecd  uint64
	Time     time.Duration
	Name     string
39 40 41 42
}

var benchmarkLog []runStats

dirkmc's avatar
dirkmc committed
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
type bench struct {
	name       string
	nodeCount  int
	blockCount int
	distFn     distFunc
	fetchFn    fetchFunc
}

var benches = []bench{
	// Fetch from two seed nodes that both have all 100 blocks
	// - request one at a time, in series
	bench{"3Nodes-AllToAll-OneAtATime", 3, 100, allToAll, oneAtATime},
	// - request all 100 with a single GetBlocks() call
	bench{"3Nodes-AllToAll-BigBatch", 3, 100, allToAll, batchFetchAll},

	// Fetch from two seed nodes, one at a time, where:
	// - node A has blocks 0 - 74
	// - node B has blocks 25 - 99
	bench{"3Nodes-Overlap1-OneAtATime", 3, 100, overlap1, oneAtATime},

	// Fetch from two seed nodes, where:
	// - node A has even blocks
	// - node B has odd blocks
	// - both nodes have every third block

	// - request one at a time, in series
	bench{"3Nodes-Overlap3-OneAtATime", 3, 100, overlap2, oneAtATime},
	// - request 10 at a time, in series
	bench{"3Nodes-Overlap3-BatchBy10", 3, 100, overlap2, batchFetchBy10},
	// - request all 100 in parallel as individual GetBlock() calls
	bench{"3Nodes-Overlap3-AllConcurrent", 3, 100, overlap2, fetchAllConcurrent},
	// - request all 100 with a single GetBlocks() call
	bench{"3Nodes-Overlap3-BigBatch", 3, 100, overlap2, batchFetchAll},
	// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
	bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch},

	// Fetch from nine seed nodes, all nodes have all blocks
	// - request one at a time, in series
	bench{"10Nodes-AllToAll-OneAtATime", 10, 100, allToAll, oneAtATime},
	// - request 10 at a time, in series
	bench{"10Nodes-AllToAll-BatchFetchBy10", 10, 100, allToAll, batchFetchBy10},
	// - request all 100 with a single GetBlocks() call
	bench{"10Nodes-AllToAll-BigBatch", 10, 100, allToAll, batchFetchAll},
	// - request all 100 in parallel as individual GetBlock() calls
	bench{"10Nodes-AllToAll-AllConcurrent", 10, 100, allToAll, fetchAllConcurrent},
	// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
	bench{"10Nodes-AllToAll-UnixfsFetch", 10, 100, allToAll, unixfsFileFetch},
	// - follow a typical IPFS request pattern for 1000 blocks
	bench{"10Nodes-AllToAll-UnixfsFetchLarge", 10, 1000, allToAll, unixfsFileFetchLarge},

	// Fetch from nine seed nodes, blocks are distributed randomly across all nodes (no dups)
	// - request one at a time, in series
	bench{"10Nodes-OnePeerPerBlock-OneAtATime", 10, 100, onePeerPerBlock, oneAtATime},
	// - request all 100 with a single GetBlocks() call
	bench{"10Nodes-OnePeerPerBlock-BigBatch", 10, 100, onePeerPerBlock, batchFetchAll},
	// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
	bench{"10Nodes-OnePeerPerBlock-UnixfsFetch", 10, 100, onePeerPerBlock, unixfsFileFetch},

	// Fetch from 199 seed nodes, all nodes have all blocks, fetch all 20 blocks with a single GetBlocks() call
	bench{"200Nodes-AllToAll-BigBatch", 200, 20, allToAll, batchFetchAll},
}

func BenchmarkFixedDelay(b *testing.B) {
106
	benchmarkLog = nil
107
	fixedDelay := delay.Fixed(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
108
	bstoreLatency := time.Duration(0)
109

dirkmc's avatar
dirkmc committed
110 111 112 113 114
	for _, bch := range benches {
		b.Run(bch.name, func(b *testing.B) {
			subtestDistributeAndFetch(b, bch.nodeCount, bch.blockCount, fixedDelay, bstoreLatency, bch.distFn, bch.fetchFn)
		})
	}
115 116

	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
Steven Allen's avatar
Steven Allen committed
117
	_ = ioutil.WriteFile("tmp/benchmark.json", out, 0666)
dirkmc's avatar
dirkmc committed
118
	printResults(benchmarkLog)
119 120
}

dirkmc's avatar
dirkmc committed
121
const datacenterSpeed = 5 * time.Millisecond
122 123 124 125
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
dirkmc's avatar
dirkmc committed
126
const datacenterDistribution = 3 * time.Millisecond
127
const distribution = 20 * time.Millisecond
dirkmc's avatar
dirkmc committed
128 129
const datacenterBandwidth = 125000000.0
const datacenterBandwidthDeviation = 3000000.0
130 131 132 133 134 135
const fastBandwidth = 1250000.0
const fastBandwidthDeviation = 300000.0
const mediumBandwidth = 500000.0
const mediumBandwidthDeviation = 80000.0
const slowBandwidth = 100000.0
const slowBandwidthDeviation = 16500.0
dirkmc's avatar
dirkmc committed
136
const rootBlockSize = 800
137
const stdBlockSize = 8000
dirkmc's avatar
dirkmc committed
138
const largeBlockSize = int64(256 * 1024)
139

dirkmc's avatar
dirkmc committed
140
func BenchmarkRealWorld(b *testing.B) {
141
	benchmarkLog = nil
142 143 144 145 146 147
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

148 149
	fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
150
		0.0, 0.0, distribution, randomGen)
151
	fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
152
	fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, randomGen)
153 154
	averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
155
		0.3, 0.3, distribution, randomGen)
156
	averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
157
	averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, randomGen)
158 159
	slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
160
		0.3, 0.3, distribution, randomGen)
161
	slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
162
	slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, randomGen)
dirkmc's avatar
dirkmc committed
163
	bstoreLatency := time.Duration(0)
164 165

	b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
dirkmc's avatar
dirkmc committed
166
		subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
167 168
	})
	b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
dirkmc's avatar
dirkmc committed
169
		subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
170 171
	})
	b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
dirkmc's avatar
dirkmc committed
172
		subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
173
	})
174
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
Steven Allen's avatar
Steven Allen committed
175
	_ = ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
dirkmc's avatar
dirkmc committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
	printResults(benchmarkLog)
}

func BenchmarkDatacenter(b *testing.B) {
	benchmarkLog = nil
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

	datacenterNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		fastSpeed-datacenterSpeed, (fastSpeed-datacenterSpeed)/2,
		0.0, 0.0, datacenterDistribution, randomGen)
	datacenterNetworkDelay := delay.Delay(datacenterSpeed, datacenterNetworkDelayGenerator)
	datacenterBandwidthGenerator := tn.VariableRateLimitGenerator(datacenterBandwidth, datacenterBandwidthDeviation, randomGen)
	bstoreLatency := time.Millisecond * 25

	b.Run("3Nodes-Overlap3-UnixfsFetch", func(b *testing.B) {
		subtestDistributeAndFetchRateLimited(b, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch)
	})
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	_ = ioutil.WriteFile("tmp/rb-benchmark.json", out, 0666)
	printResults(benchmarkLog)
}

func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
	benchmarkLog = nil
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

	datacenterNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		fastSpeed-datacenterSpeed, (fastSpeed-datacenterSpeed)/2,
		0.0, 0.0, datacenterDistribution, randomGen)
	datacenterNetworkDelay := delay.Delay(datacenterSpeed, datacenterNetworkDelayGenerator)
	datacenterBandwidthGenerator := tn.VariableRateLimitGenerator(datacenterBandwidth, datacenterBandwidthDeviation, randomGen)
	bstoreLatency := time.Millisecond * 25

	b.Run("3Leech3Seed-AllToAll-UnixfsFetch", func(b *testing.B) {
		d := datacenterNetworkDelay
		rateLimitGenerator := datacenterBandwidthGenerator
		blockSize := largeBlockSize
		df := allToAll
		ff := unixfsFileFetchLarge
		numnodes := 6
		numblks := 1000

		for i := 0; i < b.N; i++ {
			net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

			ig := testinstance.NewTestInstanceGenerator(net)
			defer ig.Close()

			instances := ig.Instances(numnodes)
			blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
			runDistributionMulti(b, instances, 3, blocks, bstoreLatency, df, ff)
		}
	})

	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	_ = ioutil.WriteFile("tmp/rb-benchmark.json", out, 0666)
	printResults(benchmarkLog)
241 242
}

dirkmc's avatar
dirkmc committed
243
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
244 245
	for i := 0; i < b.N; i++ {
		net := tn.VirtualNetwork(mockrouting.NewServer(), d)
246

247
		ig := testinstance.NewTestInstanceGenerator(net)
248

249
		instances := ig.Instances(numnodes)
dirkmc's avatar
dirkmc committed
250 251 252 253 254 255
		rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
		blocks := testutil.GenerateBlocksOfSize(numblks, stdBlockSize)
		blocks[0] = rootBlock[0]
		runDistribution(b, instances, blocks, bstoreLatency, df, ff)
		ig.Close()
		// panic("done")
256
	}
257 258
}

dirkmc's avatar
dirkmc committed
259
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
260 261
	for i := 0; i < b.N; i++ {
		net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
262

263 264
		ig := testinstance.NewTestInstanceGenerator(net)
		defer ig.Close()
265

266
		instances := ig.Instances(numnodes)
dirkmc's avatar
dirkmc committed
267
		rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
268
		blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
dirkmc's avatar
dirkmc committed
269 270
		blocks[0] = rootBlock[0]
		runDistribution(b, instances, blocks, bstoreLatency, df, ff)
271
	}
272 273
}

dirkmc's avatar
dirkmc committed
274
func runDistributionMulti(b *testing.B, instances []testinstance.Instance, numFetchers int, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
275
	numnodes := len(instances)
dirkmc's avatar
dirkmc committed
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
	fetchers := instances[numnodes-numFetchers:]

	// Distribute blocks to seed nodes
	seeds := instances[:numnodes-numFetchers]
	df(b, seeds, blocks)

	// Set the blockstore latency on seed nodes
	if bstoreLatency > 0 {
		for _, i := range seeds {
			i.SetBlockstoreLatency(bstoreLatency)
		}
	}

	// Fetch blocks (from seed nodes to leech nodes)
	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

	start := time.Now()
	var wg sync.WaitGroup
	for _, fetcher := range fetchers {
		wg.Add(1)

		go func(ftchr testinstance.Instance) {
			defer wg.Done()

			ff(b, ftchr.Exchange, ks)
		}(fetcher)
	}
	wg.Wait()

	// Collect statistics
	fetcher := fetchers[0]
	st, err := fetcher.Exchange.Stat()
	if err != nil {
		b.Fatal(err)
	}

	for _, fetcher := range fetchers {
		nst := fetcher.Adapter.Stats()
		stats := runStats{
			Time:     time.Since(start),
			MsgRecd:  nst.MessagesRecvd,
			MsgSent:  nst.MessagesSent,
			DupsRcvd: st.DupBlksReceived,
			BlksRcvd: st.BlocksReceived,
			Name:     b.Name(),
		}
		benchmarkLog = append(benchmarkLog, stats)
	}
	// b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived)
}
329

dirkmc's avatar
dirkmc committed
330 331
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
	numnodes := len(instances)
332 333
	fetcher := instances[numnodes-1]

dirkmc's avatar
dirkmc committed
334 335 336
	// Distribute blocks to seed nodes
	seeds := instances[:numnodes-1]
	df(b, seeds, blocks)
337

dirkmc's avatar
dirkmc committed
338 339 340 341 342 343 344 345
	// Set the blockstore latency on seed nodes
	if bstoreLatency > 0 {
		for _, i := range seeds {
			i.SetBlockstoreLatency(bstoreLatency)
		}
	}

	// Fetch blocks (from seed nodes to leech nodes)
346 347 348 349 350
	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

dirkmc's avatar
dirkmc committed
351
	start := time.Now()
352
	ff(b, fetcher.Exchange, ks)
353

dirkmc's avatar
dirkmc committed
354
	// Collect statistics
355 356
	st, err := fetcher.Exchange.Stat()
	if err != nil {
357
		b.Fatal(err)
358 359
	}

360
	nst := fetcher.Adapter.Stats()
361
	stats := runStats{
dirkmc's avatar
dirkmc committed
362 363 364 365 366 367
		Time:     time.Since(start),
		MsgRecd:  nst.MessagesRecvd,
		MsgSent:  nst.MessagesSent,
		DupsRcvd: st.DupBlksReceived,
		BlksRcvd: st.BlocksReceived,
		Name:     b.Name(),
368 369
	}
	benchmarkLog = append(benchmarkLog, stats)
dirkmc's avatar
dirkmc committed
370
	// b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived)
371 372
}

373
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
374 375
	for _, p := range provs {
		if err := p.Blockstore().PutMany(blocks); err != nil {
376
			b.Fatal(err)
377 378 379 380 381 382
		}
	}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
383
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
384
	if len(provs) != 2 {
385
		b.Fatal("overlap1 only works with 2 provs")
386 387 388 389 390
	}
	bill := provs[0]
	jeff := provs[1]

	if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
391
		b.Fatal(err)
392 393
	}
	if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
394
		b.Fatal(err)
395 396 397 398 399
	}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second.  it also gives every third block to both peers
400
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
401
	if len(provs) != 2 {
402
		b.Fatal("overlap2 only works with 2 provs")
403 404 405 406 407
	}
	bill := provs[0]
	jeff := provs[1]

	for i, blk := range blks {
408 409 410 411 412 413
		even := i%2 == 0
		third := i%3 == 0
		if third || even {
			if err := bill.Blockstore().Put(blk); err != nil {
				b.Fatal(err)
			}
414
		}
415 416 417 418
		if third || !even {
			if err := jeff.Blockstore().Put(blk); err != nil {
				b.Fatal(err)
			}
419 420 421 422 423 424 425
		}
	}
}

// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
426
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
427
	for _, blk := range blks {
Steven Allen's avatar
Steven Allen committed
428 429 430 431
		err := provs[rand.Intn(len(provs))].Blockstore().Put(blk)
		if err != nil {
			b.Fatal(err)
		}
432 433 434
	}
}

435
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
436
	ses := bs.NewSession(context.Background()).(*bssession.Session)
437 438 439
	for _, c := range ks {
		_, err := ses.GetBlock(context.Background(), c)
		if err != nil {
440
			b.Fatal(err)
441 442
		}
	}
dirkmc's avatar
dirkmc committed
443
	// b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
444 445 446
}

// fetch data in batches, 10 at a time
447
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
448 449 450 451
	ses := bs.NewSession(context.Background())
	for i := 0; i < len(ks); i += 10 {
		out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
		if err != nil {
452
			b.Fatal(err)
453 454 455 456 457 458 459
		}
		for range out {
		}
	}
}

// fetch each block at the same time concurrently
460
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
461 462 463 464 465 466 467 468 469
	ses := bs.NewSession(context.Background())

	var wg sync.WaitGroup
	for _, c := range ks {
		wg.Add(1)
		go func(c cid.Cid) {
			defer wg.Done()
			_, err := ses.GetBlock(context.Background(), c)
			if err != nil {
Steven Allen's avatar
Steven Allen committed
470
				b.Error(err)
471 472 473 474 475 476
			}
		}(c)
	}
	wg.Wait()
}

477
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
478 479 480
	ses := bs.NewSession(context.Background())
	out, err := ses.GetBlocks(context.Background(), ks)
	if err != nil {
481
		b.Fatal(err)
482 483 484 485 486 487
	}
	for range out {
	}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
488
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
489 490 491
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
492
		b.Fatal(err)
493 494 495 496
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
497
		b.Fatal(err)
498 499 500 501 502 503
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:])
	if err != nil {
504
		b.Fatal(err)
505 506 507 508
	}
	for range out {
	}
}
dirkmc's avatar
dirkmc committed
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616

func unixfsFileFetchLarge(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
		b.Fatal(err)
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
		b.Fatal(err)
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:100])
	if err != nil {
		b.Fatal(err)
	}
	for range out {
	}

	rest := ks[100:]
	for len(rest) > 0 {
		var batch [][]cid.Cid
		for i := 0; i < 5 && len(rest) > 0; i++ {
			cnt := 10
			if len(rest) < 10 {
				cnt = len(rest)
			}
			group := rest[:cnt]
			rest = rest[cnt:]
			batch = append(batch, group)
		}

		var anyErr error
		var wg sync.WaitGroup
		for _, group := range batch {
			wg.Add(1)
			go func(grp []cid.Cid) {
				defer wg.Done()

				out, err = ses.GetBlocks(context.Background(), grp)
				if err != nil {
					anyErr = err
				}
				for range out {
				}
			}(group)
		}
		wg.Wait()

		// Note: b.Fatal() cannot be called from within a go-routine
		if anyErr != nil {
			b.Fatal(anyErr)
		}
	}
}

func printResults(rs []runStats) {
	nameOrder := make([]string, 0)
	names := make(map[string]struct{})
	for i := 0; i < len(rs); i++ {
		if _, ok := names[rs[i].Name]; !ok {
			nameOrder = append(nameOrder, rs[i].Name)
			names[rs[i].Name] = struct{}{}
		}
	}

	for i := 0; i < len(names); i++ {
		name := nameOrder[i]
		count := 0
		sent := 0.0
		rcvd := 0.0
		dups := 0.0
		blks := 0.0
		elpd := 0.0
		for i := 0; i < len(rs); i++ {
			if rs[i].Name == name {
				count++
				sent += float64(rs[i].MsgSent)
				rcvd += float64(rs[i].MsgRecd)
				dups += float64(rs[i].DupsRcvd)
				blks += float64(rs[i].BlksRcvd)
				elpd += float64(rs[i].Time)
			}
		}
		sent /= float64(count)
		rcvd /= float64(count)
		dups /= float64(count)
		blks /= float64(count)

		label := fmt.Sprintf("%s (%d runs / %.2fs):", name, count, elpd/1000000000.0)
		fmt.Printf("%-75s %s: sent %d, recv %d, dups %d / %d\n",
			label,
			fmtDuration(time.Duration(int64(math.Round(elpd/float64(count))))),
			int64(math.Round(sent)), int64(math.Round(rcvd)),
			int64(math.Round(dups)), int64(math.Round(blks)))
	}
}

func fmtDuration(d time.Duration) string {
	d = d.Round(time.Millisecond)
	s := d / time.Second
	d -= s * time.Second
	ms := d / time.Millisecond
	return fmt.Sprintf("%d.%03ds", s, ms)
}