benchmarks_test.go 11.2 KB
Newer Older
1
package bitswap_test
2 3 4 5 6 7 8 9 10 11

import (
	"context"
	"encoding/json"
	"io/ioutil"
	"math/rand"
	"sync"
	"testing"
	"time"

12
	"github.com/ipfs/go-bitswap/testutil"
13
	blocks "github.com/ipfs/go-block-format"
14

15
	bitswap "github.com/ipfs/go-bitswap"
16
	bssession "github.com/ipfs/go-bitswap/session"
17
	testinstance "github.com/ipfs/go-bitswap/testinstance"
18
	tn "github.com/ipfs/go-bitswap/testnet"
19 20 21 22 23 24
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

25
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
26

27
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
28 29 30 31 32 33 34 35 36 37 38

type runStats struct {
	Dups    uint64
	MsgSent uint64
	MsgRecd uint64
	Time    time.Duration
	Name    string
}

var benchmarkLog []runStats

39
func BenchmarkDups2Nodes(b *testing.B) {
40
	benchmarkLog = nil
41
	fixedDelay := delay.Fixed(10 * time.Millisecond)
42
	b.Run("AllToAll-OneAtATime", func(b *testing.B) {
43
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
44
	})
45
	b.Run("AllToAll-BigBatch", func(b *testing.B) {
46
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, batchFetchAll)
47 48
	})

49
	b.Run("Overlap1-OneAtATime", func(b *testing.B) {
50
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap1, oneAtATime)
51 52
	})

53
	b.Run("Overlap2-BatchBy10", func(b *testing.B) {
54
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap2, batchFetchBy10)
55 56
	})

57
	b.Run("Overlap3-OneAtATime", func(b *testing.B) {
58
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, oneAtATime)
59
	})
60
	b.Run("Overlap3-BatchBy10", func(b *testing.B) {
61
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchBy10)
62
	})
63
	b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
64
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, fetchAllConcurrent)
65
	})
66
	b.Run("Overlap3-BigBatch", func(b *testing.B) {
67
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchAll)
68
	})
69
	b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
70
		subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, unixfsFileFetch)
71
	})
72
	b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
73
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, oneAtATime)
74
	})
75
	b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
76
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchBy10)
77
	})
78
	b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
79
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchAll)
80
	})
81
	b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
82
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, fetchAllConcurrent)
83
	})
84
	b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
85
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, unixfsFileFetch)
86
	})
87
	b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
88
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, oneAtATime)
89
	})
90
	b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
91
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, batchFetchAll)
92
	})
93
	b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
94
		subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, unixfsFileFetch)
95
	})
96
	b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
97
		subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
98 99
	})
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
100
	ioutil.WriteFile("tmp/benchmark.json", out, 0666)
101 102
}

103 104 105 106 107
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
const distribution = 20 * time.Millisecond
108 109 110 111 112 113 114
const fastBandwidth = 1250000.0
const fastBandwidthDeviation = 300000.0
const mediumBandwidth = 500000.0
const mediumBandwidthDeviation = 80000.0
const slowBandwidth = 100000.0
const slowBandwidthDeviation = 16500.0
const stdBlockSize = 8000
115 116

func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
117
	benchmarkLog = nil
118 119 120 121
	fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
		0.0, 0.0, distribution, nil)
	fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
122
	fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, nil)
123 124 125 126
	averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
		0.3, 0.3, distribution, nil)
	averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
127
	averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, nil)
128 129 130 131
	slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
		mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
		0.3, 0.3, distribution, nil)
	slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
132
	slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, nil)
133 134

	b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
135
		subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
136 137
	})
	b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
138
		subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
139 140
	})
	b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
141
		subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
142
	})
143 144
	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
145 146 147
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
148
	start := time.Now()
149
	net := tn.VirtualNetwork(mockrouting.NewServer(), d)
150

151 152
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
153 154 155

	bg := blocksutil.NewBlockGenerator()

156
	instances := ig.Instances(numnodes)
157
	blocks := bg.Blocks(numblks)
158 159 160 161 162 163 164
	runDistribution(b, instances, blocks, df, ff, start)
}

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, df distFunc, ff fetchFunc) {
	start := time.Now()
	net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

165 166
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
167

168
	instances := ig.Instances(numnodes)
169 170 171 172 173
	blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)

	runDistribution(b, instances, blocks, df, ff, start)
}

174
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
175 176

	numnodes := len(instances)
177 178 179

	fetcher := instances[numnodes-1]

180
	df(b, instances[:numnodes-1], blocks)
181 182 183 184 185 186

	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

187
	ff(b, fetcher.Exchange, ks)
188 189 190

	st, err := fetcher.Exchange.Stat()
	if err != nil {
191
		b.Fatal(err)
192 193
	}

194
	nst := fetcher.Adapter.Stats()
195 196 197 198 199
	stats := runStats{
		Time:    time.Now().Sub(start),
		MsgRecd: nst.MessagesRecvd,
		MsgSent: nst.MessagesSent,
		Dups:    st.DupBlksReceived,
200
		Name:    b.Name(),
201 202
	}
	benchmarkLog = append(benchmarkLog, stats)
203
	b.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
204
	if st.DupBlksReceived != 0 {
205
		b.Fatalf("got %d duplicate blocks!", st.DupBlksReceived)
206 207 208
	}
}

209
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
210 211
	for _, p := range provs {
		if err := p.Blockstore().PutMany(blocks); err != nil {
212
			b.Fatal(err)
213 214 215 216 217 218
		}
	}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
219
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
220
	if len(provs) != 2 {
221
		b.Fatal("overlap1 only works with 2 provs")
222 223 224 225 226
	}
	bill := provs[0]
	jeff := provs[1]

	if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
227
		b.Fatal(err)
228 229
	}
	if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
230
		b.Fatal(err)
231 232 233 234 235
	}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second.  it also gives every third block to both peers
236
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
237
	if len(provs) != 2 {
238
		b.Fatal("overlap2 only works with 2 provs")
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
	}
	bill := provs[0]
	jeff := provs[1]

	bill.Blockstore().Put(blks[0])
	jeff.Blockstore().Put(blks[0])
	for i, blk := range blks {
		if i%3 == 0 {
			bill.Blockstore().Put(blk)
			jeff.Blockstore().Put(blk)
		} else if i%2 == 1 {
			bill.Blockstore().Put(blk)
		} else {
			jeff.Blockstore().Put(blk)
		}
	}
}

257
func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
258
	if len(provs) != 2 {
259
		b.Fatal("overlap3 only works with 2 provs")
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
	}

	bill := provs[0]
	jeff := provs[1]

	bill.Blockstore().Put(blks[0])
	jeff.Blockstore().Put(blks[0])
	for i, blk := range blks {
		if i%3 == 0 {
			bill.Blockstore().Put(blk)
			jeff.Blockstore().Put(blk)
		} else if i%2 == 1 {
			bill.Blockstore().Put(blk)
		} else {
			jeff.Blockstore().Put(blk)
		}
	}
}

// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
282
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
283 284 285 286 287
	for _, blk := range blks {
		provs[rand.Intn(len(provs))].Blockstore().Put(blk)
	}
}

288
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
289
	ses := bs.NewSession(context.Background()).(*bssession.Session)
290 291 292
	for _, c := range ks {
		_, err := ses.GetBlock(context.Background(), c)
		if err != nil {
293
			b.Fatal(err)
294 295
		}
	}
296
	b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
297 298 299
}

// fetch data in batches, 10 at a time
300
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
301 302 303 304
	ses := bs.NewSession(context.Background())
	for i := 0; i < len(ks); i += 10 {
		out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
		if err != nil {
305
			b.Fatal(err)
306 307 308 309 310 311 312
		}
		for range out {
		}
	}
}

// fetch each block at the same time concurrently
313
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
314 315 316 317 318 319 320 321 322
	ses := bs.NewSession(context.Background())

	var wg sync.WaitGroup
	for _, c := range ks {
		wg.Add(1)
		go func(c cid.Cid) {
			defer wg.Done()
			_, err := ses.GetBlock(context.Background(), c)
			if err != nil {
323
				b.Fatal(err)
324 325 326 327 328 329
			}
		}(c)
	}
	wg.Wait()
}

330
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
331 332 333
	ses := bs.NewSession(context.Background())
	out, err := ses.GetBlocks(context.Background(), ks)
	if err != nil {
334
		b.Fatal(err)
335 336 337 338 339 340
	}
	for range out {
	}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
341
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
342 343 344
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
345
		b.Fatal(err)
346 347 348 349
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
350
		b.Fatal(err)
351 352 353 354 355 356
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:])
	if err != nil {
357
		b.Fatal(err)
358 359 360 361
	}
	for range out {
	}
}