bitswap_test.go 8.96 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
10
	travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
Jeromy's avatar
Jeromy committed
11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12 13

	blocks "github.com/ipfs/go-ipfs/blocks"
jbenet's avatar
jbenet committed
14
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
15
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
16
	key "github.com/ipfs/go-ipfs/blocks/key"
17 18 19
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
20
	p2ptestutil "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/test/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
25 26
const kNetworkDelay = 0 * time.Millisecond

27 28 29 30
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

31
func TestClose(t *testing.T) {
32
	vnet := getVirtualNetwork()
33
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
34
	defer sesgen.Close()
35
	bgen := blocksutil.NewBlockGenerator()
36 37 38 39

	block := bgen.Next()
	bitswap := sesgen.Next()

40 41
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
42 43
}

Jeromy's avatar
Jeromy committed
44 45 46 47 48 49 50 51 52 53 54 55 56 57
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
58 59
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Jeromy's avatar
Jeromy committed
60 61 62 63 64 65 66
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
67 68
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

69
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
70
	block := blocks.NewBlock([]byte("block"))
71
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
72
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
73

74 75
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
76
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
79 80
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81

82
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
83
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84

rht's avatar
rht committed
85 86
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
87
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88 89 90 91
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
92

93
	if !bytes.Equal(block.Data(), received.Data()) {
94 95
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96 97
}

98
func TestLargeSwarm(t *testing.T) {
99 100 101
	if testing.Short() {
		t.SkipNow()
	}
102
	numInstances := 100
103
	numBlocks := 2
104 105 106 107
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
108 109
	} else if travis.IsRunning() {
		numInstances = 200
110 111 112
	} else {
		t.Parallel()
	}
113 114
	PerformDistributionTest(t, numInstances, numBlocks)
}
115

116 117 118
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
119
	}
120 121 122 123 124

	if !travis.IsRunning() {
		t.Parallel()
	}

125 126 127
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
128 129
}

130 131 132 133 134 135 136 137 138 139 140 141
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

142 143 144 145 146 147 148 149 150
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

151
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
152
	ctx := context.Background()
153 154 155
	if testing.Short() {
		t.SkipNow()
	}
156
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
157
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
158
	defer sg.Close()
159
	bg := blocksutil.NewBlockGenerator()
160 161 162 163 164 165

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

166 167 168 169 170 171 172 173 174 175 176 177 178
	nump := len(instances) - 1
	// assert we're properly connected
	for _, inst := range instances {
		peers := inst.Exchange.wm.ConnectedPeers()
		for i := 0; i < 10 && len(peers) != nump; i++ {
			time.Sleep(time.Millisecond * 50)
			peers = inst.Exchange.wm.ConnectedPeers()
		}
		if len(peers) != nump {
			t.Fatal("not enough peers connected to instance")
		}
	}

179
	var blkeys []key.Key
180 181
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
182
		blkeys = append(blkeys, b.Key())
183
		first.Exchange.HasBlock(b)
184 185 186 187
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
188
	wg := sync.WaitGroup{}
189 190
	errs := make(chan error)

191
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
192 193 194
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
195
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
196
			if err != nil {
197
				errs <- err
Jeromy's avatar
Jeromy committed
198 199 200 201
			}
			for _ = range outch {
			}
		}(inst)
202
	}
203 204 205 206 207 208 209 210 211 212 213

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
214 215 216 217 218

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
219
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
220 221 222 223 224 225
				t.Fatal(err)
			}
		}
	}
}

226
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
227
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
228
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
229 230 231 232 233 234 235
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

236
// TODO simplify this test. get to the _essence_!
237
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
238 239 240 241
	if testing.Short() {
		t.SkipNow()
	}

242
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
243
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
244
	defer sg.Close()
245
	bg := blocksutil.NewBlockGenerator()
246

Brian Tiger Chow's avatar
Brian Tiger Chow committed
247 248
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
249

250 251 252
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
253

254 255
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
256

257
	waitTime := time.Second * 5
258

259 260
	alpha := bg.Next()
	// peerA requests and waits for block alpha
261
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
262
	defer cancel()
263
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
264
	if err != nil {
265 266
		t.Fatal(err)
	}
267

268
	// peerB announces to the network that he has block alpha
269
	err = peerB.Exchange.HasBlock(alpha)
270
	if err != nil {
271 272
		t.Fatal(err)
	}
273

274 275 276 277
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
278
	}
279

280 281
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
282
	}
283

284
}
Jeromy's avatar
Jeromy committed
285

jbenet's avatar
jbenet committed
286 287 288 289 290 291 292 293 294 295 296 297
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bs := sg.Instances(1)[0].Exchange

	_, err := bs.GetBlock(context.Background(), key.Key(""))
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

Jeromy's avatar
Jeromy committed
298
func TestBasicBitswap(t *testing.T) {
299
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
300
	sg := NewTestSessionGenerator(net)
301
	defer sg.Close()
Jeromy's avatar
Jeromy committed
302 303
	bg := blocksutil.NewBlockGenerator()

304
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
305 306 307

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
308
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
309 310 311 312
	if err != nil {
		t.Fatal(err)
	}

313
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
314
	defer cancel()
Jeromy's avatar
Jeromy committed
315 316 317 318 319 320 321 322 323 324 325 326 327
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)

	ctx1, cancel1 := context.WithCancel(context.Background())

	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

355 356
	// ensure both requests make it into the wantlist at the same time
	time.Sleep(time.Millisecond * 100)
Jeromy's avatar
Jeromy committed
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	blk, ok := <-blkch2
	if !ok {
		t.Fatal("expected to get the block here")
	}
	t.Log(blk)

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}