bitswap_test.go 10.6 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"context"
6
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
7 8 9
	"testing"
	"time"

10
	blocks "github.com/ipfs/go-ipfs/blocks"
jbenet's avatar
jbenet committed
11
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
12 13 14 15
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
16 17 18
	travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"

	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
19 20

	p2ptestutil "gx/ipfs/QmcDTquYLTYirqj71RRWKUWEEw3nJt11Awzun5ep8kfY7W/go-libp2p-netutil"
21
	cid "gx/ipfs/QmcEcrBAMrwMyhSjXt4yfyPpzgSuV8HLHavnfmiKCSRqZU/go-cid"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
24 25
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
26 27
const kNetworkDelay = 0 * time.Millisecond

28 29 30 31
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

32
func TestClose(t *testing.T) {
33
	vnet := getVirtualNetwork()
34
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
35
	defer sesgen.Close()
36
	bgen := blocksutil.NewBlockGenerator()
37 38 39 40

	block := bgen.Next()
	bitswap := sesgen.Next()

41
	bitswap.Exchange.Close()
42
	bitswap.Exchange.GetBlock(context.Background(), block.Cid())
43 44
}

Jeromy's avatar
Jeromy committed
45 46 47 48 49 50 51 52 53
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
54
	rs.Client(pinfo).Provide(context.Background(), block.Cid()) // but not on network
Jeromy's avatar
Jeromy committed
55 56 57 58

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
59 60
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
61
	_, err := solo.Exchange.GetBlock(ctx, block.Cid())
Jeromy's avatar
Jeromy committed
62 63 64 65 66 67

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
68 69
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

70
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
71
	block := blocks.NewBlock([]byte("block"))
72
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
73
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
74

75 76
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
77
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78

79
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
80 81
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82

83
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
84
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
85

rht's avatar
rht committed
86 87
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
88
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90 91 92
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
93

Jeromy's avatar
Jeromy committed
94
	if !bytes.Equal(block.RawData(), received.RawData()) {
95 96
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97 98
}

99
func TestLargeSwarm(t *testing.T) {
100 101 102
	if testing.Short() {
		t.SkipNow()
	}
103
	numInstances := 100
104
	numBlocks := 2
105 106 107 108
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
109 110
	} else if travis.IsRunning() {
		numInstances = 200
111 112 113
	} else {
		t.Parallel()
	}
114 115
	PerformDistributionTest(t, numInstances, numBlocks)
}
116

117 118 119
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
120
	}
121 122 123 124 125

	if !travis.IsRunning() {
		t.Parallel()
	}

126 127 128
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
129 130
}

131 132 133 134 135 136 137 138 139 140 141 142
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

143 144 145 146 147 148 149 150 151
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

152
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
153
	ctx := context.Background()
154 155 156
	if testing.Short() {
		t.SkipNow()
	}
157
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
158
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
159
	defer sg.Close()
160
	bg := blocksutil.NewBlockGenerator()
161 162 163 164 165 166

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

167 168 169 170 171 172 173 174 175 176 177 178 179
	nump := len(instances) - 1
	// assert we're properly connected
	for _, inst := range instances {
		peers := inst.Exchange.wm.ConnectedPeers()
		for i := 0; i < 10 && len(peers) != nump; i++ {
			time.Sleep(time.Millisecond * 50)
			peers = inst.Exchange.wm.ConnectedPeers()
		}
		if len(peers) != nump {
			t.Fatal("not enough peers connected to instance")
		}
	}

180
	var blkeys []*cid.Cid
181 182
	first := instances[0]
	for _, b := range blocks {
183
		blkeys = append(blkeys, b.Cid())
184
		first.Exchange.HasBlock(b)
185 186 187 188
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
189
	wg := sync.WaitGroup{}
190 191
	errs := make(chan error)

192
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
193 194 195
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
196
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
197
			if err != nil {
198
				errs <- err
Jeromy's avatar
Jeromy committed
199 200 201 202
			}
			for _ = range outch {
			}
		}(inst)
203
	}
204 205 206 207 208 209 210 211 212 213 214

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
215 216 217 218 219

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
220
			if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
221 222 223 224 225 226
				t.Fatal(err)
			}
		}
	}
}

227
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
228 229
	if _, err := bitswap.Blockstore().Get(b.Cid()); err != nil {
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Cid())
230 231 232 233 234 235 236
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

237
// TODO simplify this test. get to the _essence_!
238
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
239 240 241 242
	if testing.Short() {
		t.SkipNow()
	}

243
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
244
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
245
	defer sg.Close()
246
	bg := blocksutil.NewBlockGenerator()
247

Brian Tiger Chow's avatar
Brian Tiger Chow committed
248 249
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
250

251 252 253
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
254

255 256
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
257

258
	waitTime := time.Second * 5
259

260 261
	alpha := bg.Next()
	// peerA requests and waits for block alpha
262
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
263
	defer cancel()
264
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []*cid.Cid{alpha.Cid()})
265
	if err != nil {
266 267
		t.Fatal(err)
	}
268

269
	// peerB announces to the network that he has block alpha
270
	err = peerB.Exchange.HasBlock(alpha)
271
	if err != nil {
272 273
		t.Fatal(err)
	}
274

275 276 277 278
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
279
	}
280

281
	if !blkrecvd.Cid().Equals(alpha.Cid()) {
282
		t.Fatal("Wrong block!")
283
	}
284

285
}
Jeromy's avatar
Jeromy committed
286

jbenet's avatar
jbenet committed
287 288 289 290 291 292
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bs := sg.Instances(1)[0].Exchange

Jeromy's avatar
Jeromy committed
293 294 295
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

296
	_, err := bs.GetBlock(ctx, nil)
jbenet's avatar
jbenet committed
297 298 299 300 301
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

Jeromy's avatar
Jeromy committed
302
func TestBasicBitswap(t *testing.T) {
303
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
304
	sg := NewTestSessionGenerator(net)
305
	defer sg.Close()
Jeromy's avatar
Jeromy committed
306 307
	bg := blocksutil.NewBlockGenerator()

308
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
309 310 311

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
312
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
313 314 315 316
	if err != nil {
		t.Fatal(err)
	}

317
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
318
	defer cancel()
319
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
Jeromy's avatar
Jeromy committed
320 321 322 323 324 325 326 327 328 329 330 331
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
332 333 334 335 336 337 338 339 340 341 342 343 344

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)

	ctx1, cancel1 := context.WithCancel(context.Background())
345
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
346 347 348 349 350 351 352
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

353
	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []*cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
354 355 356 357
	if err != nil {
		t.Fatal(err)
	}

358 359
	// ensure both requests make it into the wantlist at the same time
	time.Sleep(time.Millisecond * 100)
Jeromy's avatar
Jeromy committed
360 361 362 363 364 365 366 367 368 369 370 371
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

372 373 374 375 376 377 378 379
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
380 381 382 383 384 385 386 387 388
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
389 390 391 392 393 394 395 396 397 398 399

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	instances := sg.Instances(1)[0]
	bswap := instances.Exchange
	blocks := bg.Blocks(20)

400
	var keys []*cid.Cid
401
	for _, b := range blocks {
402
		keys = append(keys, b.Cid())
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

	if len(bswap.GetWantlist()) > 0 {
		t.Fatal("should not have anyting in wantlist")
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

	if len(bswap.GetWantlist()) > 0 {
		t.Fatal("should not have anyting in wantlist")
	}

	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 50)
	if len(bswap.GetWantlist()) != 11 {
		t.Fatal("should have 11 keys in wantlist")
	}

	cancel()
	time.Sleep(time.Millisecond * 50)
	if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
		t.Fatal("should only have keys[0] in wantlist")
	}
}