bitswap_test.go 8.89 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
10
	travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
Jeromy's avatar
Jeromy committed
11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12 13

	blocks "github.com/ipfs/go-ipfs/blocks"
jbenet's avatar
jbenet committed
14
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
15
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
16
	key "github.com/ipfs/go-ipfs/blocks/key"
17 18 19
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
20
	p2ptestutil "gx/ipfs/QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms/go-libp2p/p2p/test/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
25 26
const kNetworkDelay = 0 * time.Millisecond

27
func TestClose(t *testing.T) {
28
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
29
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
30
	defer sesgen.Close()
31
	bgen := blocksutil.NewBlockGenerator()
32 33 34 35

	block := bgen.Next()
	bitswap := sesgen.Next()

36 37
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
38 39
}

Jeromy's avatar
Jeromy committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
54 55
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Jeromy's avatar
Jeromy committed
56 57 58 59 60 61 62
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

65
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
66
	block := blocks.NewBlock([]byte("block"))
67
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
68
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69

70 71
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
72
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
73

74
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
75 76
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
79
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
80

rht's avatar
rht committed
81 82
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
83
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84 85 86 87
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
88

89
	if !bytes.Equal(block.Data(), received.Data()) {
90 91
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92 93
}

94
func TestLargeSwarm(t *testing.T) {
95 96 97
	if testing.Short() {
		t.SkipNow()
	}
98
	numInstances := 100
99
	numBlocks := 2
100 101 102 103
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
104 105
	} else if travis.IsRunning() {
		numInstances = 200
106 107 108
	} else {
		t.Parallel()
	}
109 110
	PerformDistributionTest(t, numInstances, numBlocks)
}
111

112 113 114
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
115
	}
116 117 118 119 120

	if !travis.IsRunning() {
		t.Parallel()
	}

121 122 123
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
124 125
}

126 127 128 129 130 131 132 133 134 135 136 137
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

138 139 140 141 142 143 144 145 146
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

147
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
148
	ctx := context.Background()
149 150 151
	if testing.Short() {
		t.SkipNow()
	}
152
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
153
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
154
	defer sg.Close()
155
	bg := blocksutil.NewBlockGenerator()
156 157 158 159 160 161

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

162 163 164 165 166 167 168 169 170 171 172 173 174
	nump := len(instances) - 1
	// assert we're properly connected
	for _, inst := range instances {
		peers := inst.Exchange.wm.ConnectedPeers()
		for i := 0; i < 10 && len(peers) != nump; i++ {
			time.Sleep(time.Millisecond * 50)
			peers = inst.Exchange.wm.ConnectedPeers()
		}
		if len(peers) != nump {
			t.Fatal("not enough peers connected to instance")
		}
	}

175
	var blkeys []key.Key
176 177
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
178
		blkeys = append(blkeys, b.Key())
179
		first.Exchange.HasBlock(b)
180 181 182 183
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
184
	wg := sync.WaitGroup{}
185 186
	errs := make(chan error)

187
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
188 189 190
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
191
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
192
			if err != nil {
193
				errs <- err
Jeromy's avatar
Jeromy committed
194 195 196 197
			}
			for _ = range outch {
			}
		}(inst)
198
	}
199 200 201 202 203 204 205 206 207 208 209

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
210 211 212 213 214

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
215
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
216 217 218 219 220 221
				t.Fatal(err)
			}
		}
	}
}

222
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
223
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
224
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
225 226 227 228 229 230 231
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

232
// TODO simplify this test. get to the _essence_!
233
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
234 235 236 237
	if testing.Short() {
		t.SkipNow()
	}

238
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
239
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
240
	defer sg.Close()
241
	bg := blocksutil.NewBlockGenerator()
242

Brian Tiger Chow's avatar
Brian Tiger Chow committed
243 244
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
245

246 247 248
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
249

250 251
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
252

253
	waitTime := time.Second * 5
254

255 256
	alpha := bg.Next()
	// peerA requests and waits for block alpha
257
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
258
	defer cancel()
259
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
260
	if err != nil {
261 262
		t.Fatal(err)
	}
263

264
	// peerB announces to the network that he has block alpha
265
	err = peerB.Exchange.HasBlock(alpha)
266
	if err != nil {
267 268
		t.Fatal(err)
	}
269

270 271 272 273
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
274
	}
275

276 277
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
278
	}
279

280
}
Jeromy's avatar
Jeromy committed
281

jbenet's avatar
jbenet committed
282 283 284 285 286 287 288 289 290 291 292 293
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bs := sg.Instances(1)[0].Exchange

	_, err := bs.GetBlock(context.Background(), key.Key(""))
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

Jeromy's avatar
Jeromy committed
294
func TestBasicBitswap(t *testing.T) {
295
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
296
	sg := NewTestSessionGenerator(net)
297
	defer sg.Close()
Jeromy's avatar
Jeromy committed
298 299
	bg := blocksutil.NewBlockGenerator()

300
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
301 302 303

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
304
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
305 306 307 308
	if err != nil {
		t.Fatal(err)
	}

309
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
310
	defer cancel()
Jeromy's avatar
Jeromy committed
311 312 313 314 315 316 317 318 319 320 321 322 323
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)

	ctx1, cancel1 := context.WithCancel(context.Background())

	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

351 352
	// ensure both requests make it into the wantlist at the same time
	time.Sleep(time.Millisecond * 100)
Jeromy's avatar
Jeromy committed
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	blk, ok := <-blkch2
	if !ok {
		t.Fatal("expected to get the block here")
	}
	t.Log(blk)

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}