bitswap_test.go 10.6 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9
	context "context"
10
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
11
	travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
12 13

	blocks "github.com/ipfs/go-ipfs/blocks"
jbenet's avatar
jbenet committed
14
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
15 16 17 18
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
19 20
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
	p2ptestutil "gx/ipfs/QmcRa2qn6iCmap9bjp8jAwkvYAq13AUfxdY3rrYiaJbLum/go-libp2p/p2p/test/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
25 26
const kNetworkDelay = 0 * time.Millisecond

27 28 29 30
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

31
func TestClose(t *testing.T) {
32
	vnet := getVirtualNetwork()
33
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
34
	defer sesgen.Close()
35
	bgen := blocksutil.NewBlockGenerator()
36 37 38 39

	block := bgen.Next()
	bitswap := sesgen.Next()

40 41
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
42 43
}

Jeromy's avatar
Jeromy committed
44 45 46 47 48 49 50 51 52
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
53
	rs.Client(pinfo).Provide(context.Background(), block.Cid()) // but not on network
Jeromy's avatar
Jeromy committed
54 55 56 57

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
58 59
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Jeromy's avatar
Jeromy committed
60 61 62 63 64 65 66
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
67 68
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

69
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
70
	block := blocks.NewBlock([]byte("block"))
71
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
72
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
73

74 75
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
76
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
79 80
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81

82
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
83
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84

rht's avatar
rht committed
85 86
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
87
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88 89 90 91
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
92

Jeromy's avatar
Jeromy committed
93
	if !bytes.Equal(block.RawData(), received.RawData()) {
94 95
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96 97
}

98
func TestLargeSwarm(t *testing.T) {
99 100 101
	if testing.Short() {
		t.SkipNow()
	}
102
	numInstances := 100
103
	numBlocks := 2
104 105 106 107
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
108 109
	} else if travis.IsRunning() {
		numInstances = 200
110 111 112
	} else {
		t.Parallel()
	}
113 114
	PerformDistributionTest(t, numInstances, numBlocks)
}
115

116 117 118
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
119
	}
120 121 122 123 124

	if !travis.IsRunning() {
		t.Parallel()
	}

125 126 127
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
128 129
}

130 131 132 133 134 135 136 137 138 139 140 141
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

142 143 144 145 146 147 148 149 150
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

151
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
152
	ctx := context.Background()
153 154 155
	if testing.Short() {
		t.SkipNow()
	}
156
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
157
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
158
	defer sg.Close()
159
	bg := blocksutil.NewBlockGenerator()
160 161 162 163 164 165

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

166 167 168 169 170 171 172 173 174 175 176 177 178
	nump := len(instances) - 1
	// assert we're properly connected
	for _, inst := range instances {
		peers := inst.Exchange.wm.ConnectedPeers()
		for i := 0; i < 10 && len(peers) != nump; i++ {
			time.Sleep(time.Millisecond * 50)
			peers = inst.Exchange.wm.ConnectedPeers()
		}
		if len(peers) != nump {
			t.Fatal("not enough peers connected to instance")
		}
	}

179
	var blkeys []key.Key
180 181
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
182
		blkeys = append(blkeys, b.Key())
183
		first.Exchange.HasBlock(b)
184 185 186 187
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
188
	wg := sync.WaitGroup{}
189 190
	errs := make(chan error)

191
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
192 193 194
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
195
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
196
			if err != nil {
197
				errs <- err
Jeromy's avatar
Jeromy committed
198 199 200 201
			}
			for _ = range outch {
			}
		}(inst)
202
	}
203 204 205 206 207 208 209 210 211 212 213

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
214 215 216 217 218

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
219
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
220 221 222 223 224 225
				t.Fatal(err)
			}
		}
	}
}

226
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
227
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
228
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
229 230 231 232 233 234 235
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

236
// TODO simplify this test. get to the _essence_!
237
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
238 239 240 241
	if testing.Short() {
		t.SkipNow()
	}

242
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
243
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
244
	defer sg.Close()
245
	bg := blocksutil.NewBlockGenerator()
246

Brian Tiger Chow's avatar
Brian Tiger Chow committed
247 248
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
249

250 251 252
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
253

254 255
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
256

257
	waitTime := time.Second * 5
258

259 260
	alpha := bg.Next()
	// peerA requests and waits for block alpha
261
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
262
	defer cancel()
263
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
264
	if err != nil {
265 266
		t.Fatal(err)
	}
267

268
	// peerB announces to the network that he has block alpha
269
	err = peerB.Exchange.HasBlock(alpha)
270
	if err != nil {
271 272
		t.Fatal(err)
	}
273

274 275 276 277
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
278
	}
279

280 281
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
282
	}
283

284
}
Jeromy's avatar
Jeromy committed
285

jbenet's avatar
jbenet committed
286 287 288 289 290 291
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bs := sg.Instances(1)[0].Exchange

Jeromy's avatar
Jeromy committed
292 293 294 295
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	_, err := bs.GetBlock(ctx, key.Key(""))
jbenet's avatar
jbenet committed
296 297 298 299 300
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

Jeromy's avatar
Jeromy committed
301
func TestBasicBitswap(t *testing.T) {
302
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
303
	sg := NewTestSessionGenerator(net)
304
	defer sg.Close()
Jeromy's avatar
Jeromy committed
305 306
	bg := blocksutil.NewBlockGenerator()

307
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
308 309 310

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
311
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
312 313 314 315
	if err != nil {
		t.Fatal(err)
	}

316
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
317
	defer cancel()
Jeromy's avatar
Jeromy committed
318 319 320 321 322 323 324 325 326 327 328 329 330
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)

	ctx1, cancel1 := context.WithCancel(context.Background())
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

357 358
	// ensure both requests make it into the wantlist at the same time
	time.Sleep(time.Millisecond * 100)
Jeromy's avatar
Jeromy committed
359 360 361 362 363 364 365 366 367 368 369 370
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

371 372 373 374 375 376 377 378
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
379 380 381 382 383 384 385 386 387
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	instances := sg.Instances(1)[0]
	bswap := instances.Exchange
	blocks := bg.Blocks(20)

	var keys []key.Key
	for _, b := range blocks {
		keys = append(keys, b.Key())
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

	if len(bswap.GetWantlist()) > 0 {
		t.Fatal("should not have anyting in wantlist")
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

	if len(bswap.GetWantlist()) > 0 {
		t.Fatal("should not have anyting in wantlist")
	}

	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 50)
	if len(bswap.GetWantlist()) != 11 {
		t.Fatal("should have 11 keys in wantlist")
	}

	cancel()
	time.Sleep(time.Millisecond * 50)
	if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
		t.Fatal("should only have keys[0] in wantlist")
	}
}