dup_blocks_test.go 7.72 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
package bitswap

import (
	"context"
	"encoding/json"
	"io/ioutil"
	"math/rand"
	"sync"
	"testing"
	"time"

	tn "github.com/ipfs/go-bitswap/testnet"

	"github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

21
type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid)
22

23
type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block)
24 25 26 27 28 29 30 31 32 33 34

type runStats struct {
	Dups    uint64
	MsgSent uint64
	MsgRecd uint64
	Time    time.Duration
	Name    string
}

var benchmarkLog []runStats

35 36 37
func BenchmarkDups2Nodes(b *testing.B) {
	b.Run("AllToAll-OneAtATime", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime)
38
	})
39 40
	b.Run("AllToAll-BigBatch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll)
41 42
	})

43 44
	b.Run("Overlap1-OneAtATime", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime)
45 46
	})

47 48
	b.Run("Overlap2-BatchBy10", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10)
49 50
	})

51 52
	b.Run("Overlap3-OneAtATime", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime)
53
	})
54 55
	b.Run("Overlap3-BatchBy10", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10)
56
	})
57 58
	b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent)
59
	})
60 61
	b.Run("Overlap3-BigBatch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll)
62
	})
63 64
	b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch)
65
	})
66 67
	b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime)
68
	})
69 70
	b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10)
71
	})
72 73
	b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll)
74
	})
75 76
	b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent)
77
	})
78 79
	b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch)
80
	})
81 82
	b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime)
83
	})
84 85
	b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll)
86
	})
87 88
	b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch)
89
	})
90 91
	b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
		subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll)
92 93 94 95 96 97
	})

	out, _ := json.MarshalIndent(benchmarkLog, "", "  ")
	ioutil.WriteFile("benchmark.json", out, 0666)
}

98
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) {
99 100 101 102 103 104 105 106 107 108 109 110
	start := time.Now()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()

	bg := blocksutil.NewBlockGenerator()

	instances := sg.Instances(numnodes)
	blocks := bg.Blocks(numblks)

	fetcher := instances[numnodes-1]

111
	df(b, instances[:numnodes-1], blocks)
112 113 114 115 116 117

	var ks []cid.Cid
	for _, blk := range blocks {
		ks = append(ks, blk.Cid())
	}

118
	ff(b, fetcher.Exchange, ks)
119 120 121

	st, err := fetcher.Exchange.Stat()
	if err != nil {
122
		b.Fatal(err)
123 124 125 126 127 128 129 130
	}

	nst := fetcher.Exchange.network.Stats()
	stats := runStats{
		Time:    time.Now().Sub(start),
		MsgRecd: nst.MessagesRecvd,
		MsgSent: nst.MessagesSent,
		Dups:    st.DupBlksReceived,
131
		Name:    b.Name(),
132 133
	}
	benchmarkLog = append(benchmarkLog, stats)
134
	b.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
135
	if st.DupBlksReceived != 0 {
136
		b.Fatalf("got %d duplicate blocks!", st.DupBlksReceived)
137 138 139
	}
}

140
func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
141 142
	for _, p := range provs {
		if err := p.Blockstore().PutMany(blocks); err != nil {
143
			b.Fatal(err)
144 145 146 147 148 149
		}
	}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
150
func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
151
	if len(provs) != 2 {
152
		b.Fatal("overlap1 only works with 2 provs")
153 154 155 156 157
	}
	bill := provs[0]
	jeff := provs[1]

	if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
158
		b.Fatal(err)
159 160
	}
	if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
161
		b.Fatal(err)
162 163 164 165 166
	}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second.  it also gives every third block to both peers
167
func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
168
	if len(provs) != 2 {
169
		b.Fatal("overlap2 only works with 2 provs")
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
	}
	bill := provs[0]
	jeff := provs[1]

	bill.Blockstore().Put(blks[0])
	jeff.Blockstore().Put(blks[0])
	for i, blk := range blks {
		if i%3 == 0 {
			bill.Blockstore().Put(blk)
			jeff.Blockstore().Put(blk)
		} else if i%2 == 1 {
			bill.Blockstore().Put(blk)
		} else {
			jeff.Blockstore().Put(blk)
		}
	}
}

188
func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
189
	if len(provs) != 2 {
190
		b.Fatal("overlap3 only works with 2 provs")
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
	}

	bill := provs[0]
	jeff := provs[1]

	bill.Blockstore().Put(blks[0])
	jeff.Blockstore().Put(blks[0])
	for i, blk := range blks {
		if i%3 == 0 {
			bill.Blockstore().Put(blk)
			jeff.Blockstore().Put(blk)
		} else if i%2 == 1 {
			bill.Blockstore().Put(blk)
		} else {
			jeff.Blockstore().Put(blk)
		}
	}
}

// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
213
func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
214 215 216 217 218
	for _, blk := range blks {
		provs[rand.Intn(len(provs))].Blockstore().Put(blk)
	}
}

219
func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
220 221 222 223
	ses := bs.NewSession(context.Background()).(*Session)
	for _, c := range ks {
		_, err := ses.GetBlock(context.Background(), c)
		if err != nil {
224
			b.Fatal(err)
225 226
		}
	}
227
	b.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt))
228 229 230
}

// fetch data in batches, 10 at a time
231
func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
232 233 234 235
	ses := bs.NewSession(context.Background())
	for i := 0; i < len(ks); i += 10 {
		out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
		if err != nil {
236
			b.Fatal(err)
237 238 239 240 241 242 243
		}
		for range out {
		}
	}
}

// fetch each block at the same time concurrently
244
func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
245 246 247 248 249 250 251 252 253
	ses := bs.NewSession(context.Background())

	var wg sync.WaitGroup
	for _, c := range ks {
		wg.Add(1)
		go func(c cid.Cid) {
			defer wg.Done()
			_, err := ses.GetBlock(context.Background(), c)
			if err != nil {
254
				b.Fatal(err)
255 256 257 258 259 260
			}
		}(c)
	}
	wg.Wait()
}

261
func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
262 263 264
	ses := bs.NewSession(context.Background())
	out, err := ses.GetBlocks(context.Background(), ks)
	if err != nil {
265
		b.Fatal(err)
266 267 268 269 270 271
	}
	for range out {
	}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
272
func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) {
273 274 275
	ses := bs.NewSession(context.Background())
	_, err := ses.GetBlock(context.Background(), ks[0])
	if err != nil {
276
		b.Fatal(err)
277 278 279 280
	}

	out, err := ses.GetBlocks(context.Background(), ks[1:11])
	if err != nil {
281
		b.Fatal(err)
282 283 284 285 286 287
	}
	for range out {
	}

	out, err = ses.GetBlocks(context.Background(), ks[11:])
	if err != nil {
288
		b.Fatal(err)
289 290 291 292
	}
	for range out {
	}
}