benchmarks_test.go 11.4 KB
Newer Older
1
package bitswap_test
2 3 4 5 6 7

import (
	"context"
	"encoding/json"
	"io/ioutil"
	"math/rand"
8 9
	"os"
	"strconv"
10 11 12 13
	"sync"
	"testing"
	"time"

14
	"github.com/ipfs/go-bitswap/testutil"
15
	blocks "github.com/ipfs/go-block-format"
16

17
	bitswap "github.com/ipfs/go-bitswap"
18
	bssession "github.com/ipfs/go-bitswap/session"
19
	testinstance "github.com/ipfs/go-bitswap/testinstance"
20
	tn "github.com/ipfs/go-bitswap/testnet"
21 22 23 24 25 26
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

27
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
28

29
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
30 31 32 33 34 35 36 37 38 39 40

type runStats struct {
	Dups    uint64
	MsgSent uint64
	MsgRecd uint64
	Time    time.Duration
	Name    string
}

var benchmarkLog []runStats

41
func BenchmarkDups2Nodes(b *testing.B) {
42
	benchmarkLog = nil
43
	fixedDelay := delay.Fixed(10 * time.Millisecond)
44
	b.Run("AllToAll-OneAtATime", func(b *testing.B) {
45
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
46
	})
47
	b.Run("AllToAll-BigBatch", func(b *testing.B) {
48
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, batchFetchAll)
49 50
	})

51
	b.Run("Overlap1-OneAtATime", func(b *testing.B) {
52
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap1, oneAtATime)
53 54
	})

55
	b.Run("Overlap2-BatchBy10", func(b *testing.B) {
56
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap2, batchFetchBy10)
57 58
	})

59
	b.Run("Overlap3-OneAtATime", func(b *testing.B) {
60
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, oneAtATime)
61
	})
62
	b.Run("Overlap3-BatchBy10", func(b *testing.B) {
63
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchBy10)
64
	})
65
	b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
66
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, fetchAllConcurrent)
67
	})
68
	b.Run("Overlap3-BigBatch", func(b *testing.B) {
69
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchAll)
70
	})
71
	b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
72
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, unixfsFileFetch)
73
	})
74
	b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
75
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, oneAtATime)
76
	})
77
	b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
78
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchBy10)
79
	})
80
	b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
81
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchAll)
82
	})
83
	b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
84
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, fetchAllConcurrent)
85
	})
86
	b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
87
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, unixfsFileFetch)
88
	})
89
	b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
90
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, oneAtATime)
91
	})
92
	b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
93
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, batchFetchAll)
94
	})
95
	b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
96
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, unixfsFileFetch)
97
	})
98
	b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
99
		subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
100 101
	})
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
102
	ioutil.WriteFile("tmp/benchmark.json", out, 0666)
103 104
}

105 106 107 108 109
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
const distribution = 20 * time.Millisecond
110 111 112 113 114 115 116
const fastBandwidth = 1250000.0
const fastBandwidthDeviation = 300000.0
const mediumBandwidth = 500000.0
const mediumBandwidthDeviation = 80000.0
const slowBandwidth = 100000.0
const slowBandwidthDeviation = 16500.0
const stdBlockSize = 8000
117 118

func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
119
	benchmarkLog = nil
120 121 122 123 124 125
	benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
	var randomGen *rand.Rand = nil
	if err == nil {
		randomGen = rand.New(rand.NewSource(benchmarkSeed))
	}

126 127
	fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
128
		0.0, 0.0, distribution, randomGen)
129
	fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
130
	fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, randomGen)
131 132
	averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
133
		0.3, 0.3, distribution, randomGen)
134
	averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
135
	averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, randomGen)
136 137
	slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
138
		0.3, 0.3, distribution, randomGen)
139
	slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
140
	slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, randomGen)
141 142

	b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
143
		subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
144 145
	})
	b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
146
		subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
147 148
	})
	b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
149
		subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
150
	})
151 152
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
153 154 155
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
156 157 158
	for i := 0; i < b.N; i++ {
		start := time.Now()
		net := tn.VirtualNetwork(mockrouting.NewServer(), d)
159

160 161
		ig := testinstance.NewTestInstanceGenerator(net)
		defer ig.Close()
162

163
		bg := blocksutil.NewBlockGenerator()
164

165 166 167 168
		instances := ig.Instances(numnodes)
		blocks := bg.Blocks(numblks)
		runDistribution(b, instances, blocks, df, ff, start)
	}
169 170 171
}

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, df distFunc, ff fetchFunc) {
172
	for i := 0; i < b.N; i++ {
173

174 175
		start := time.Now()
		net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
176

177 178
		ig := testinstance.NewTestInstanceGenerator(net)
		defer ig.Close()
179

180 181 182 183 184
		instances := ig.Instances(numnodes)
		blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)

		runDistribution(b, instances, blocks, df, ff, start)
	}
185 186
}

187
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
188 189

	numnodes := len(instances)
190 191 192

	fetcher := instances[numnodes-1]

193
	df(b, instances[:numnodes-1], blocks)
194 195 196 197 198 199

	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

200
	ff(b, fetcher.Exchange, ks)
201 202 203

	st, err := fetcher.Exchange.Stat()
	if err != nil {
204
		b.Fatal(err)
205 206
	}

207
	nst := fetcher.Adapter.Stats()
208 209 210 211 212
	stats := runStats{
		Time:    time.Now().Sub(start),
		MsgRecd: nst.MessagesRecvd,
		MsgSent: nst.MessagesSent,
		Dups:    st.DupBlksReceived,
213
		Name:    b.Name(),
214 215
	}
	benchmarkLog = append(benchmarkLog, stats)
216
	b.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
217 218
}

219
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
220 221
	for _, p := range provs {
		if err := p.Blockstore().PutMany(blocks); err != nil {
222
			b.Fatal(err)
223 224 225 226 227 228
		}
	}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
229
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
230
	if len(provs) != 2 {
231
		b.Fatal("overlap1 only works with 2 provs")
232 233 234 235 236
	}
	bill := provs[0]
	jeff := provs[1]

	if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
237
		b.Fatal(err)
238 239
	}
	if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
240
		b.Fatal(err)
241 242 243 244 245
	}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second.  it also gives every third block to both peers
246
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
247
	if len(provs) != 2 {
248
		b.Fatal("overlap2 only works with 2 provs")
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
	}
	bill := provs[0]
	jeff := provs[1]

	bill.Blockstore().Put(blks[0])
	jeff.Blockstore().Put(blks[0])
	for i, blk := range blks {
		if i%3 == 0 {
			bill.Blockstore().Put(blk)
			jeff.Blockstore().Put(blk)
		} else if i%2 == 1 {
			bill.Blockstore().Put(blk)
		} else {
			jeff.Blockstore().Put(blk)
		}
	}
}

267
func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
268
	if len(provs) != 2 {
269
		b.Fatal("overlap3 only works with 2 provs")
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
	}

	bill := provs[0]
	jeff := provs[1]

	bill.Blockstore().Put(blks[0])
	jeff.Blockstore().Put(blks[0])
	for i, blk := range blks {
		if i%3 == 0 {
			bill.Blockstore().Put(blk)
			jeff.Blockstore().Put(blk)
		} else if i%2 == 1 {
			bill.Blockstore().Put(blk)
		} else {
			jeff.Blockstore().Put(blk)
		}
	}
}

// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
292
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
293 294 295 296 297
	for _, blk := range blks {
		provs[rand.Intn(len(provs))].Blockstore().Put(blk)
	}
}

298
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
299
	ses := bs.NewSession(context.Background()).(*bssession.Session)
300 301 302
	for _, c := range ks {
		_, err := ses.GetBlock(context.Background(), c)
		if err != nil {
303
			b.Fatal(err)
304 305
		}
	}
306
	b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
307 308 309
}

// fetch data in batches, 10 at a time
310
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
311 312 313 314
	ses := bs.NewSession(context.Background())
	for i := 0; i < len(ks); i += 10 {
		out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
		if err != nil {
315
			b.Fatal(err)
316 317 318 319 320 321 322
		}
		for range out {
		}
	}
}

// fetch each block at the same time concurrently
323
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
324 325 326 327 328 329 330 331 332
	ses := bs.NewSession(context.Background())

	var wg sync.WaitGroup
	for _, c := range ks {
		wg.Add(1)
		go func(c cid.Cid) {
			defer wg.Done()
			_, err := ses.GetBlock(context.Background(), c)
			if err != nil {
333
				b.Fatal(err)
334 335 336 337 338 339
			}
		}(c)
	}
	wg.Wait()
}

340
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
341 342 343
	ses := bs.NewSession(context.Background())
	out, err := ses.GetBlocks(context.Background(), ks)
	if err != nil {
344
		b.Fatal(err)
345 346 347 348 349 350
	}
	for range out {
	}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
351
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
352 353 354
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
355
		b.Fatal(err)
356 357 358 359
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
360
		b.Fatal(err)
361 362 363 364 365 366
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:])
	if err != nil {
367
		b.Fatal(err)
368 369 370 371
	}
	for range out {
	}
}