bitswap_test.go 8.49 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
10
	travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
Jeromy's avatar
Jeromy committed
11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12 13 14

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
15
	key "github.com/ipfs/go-ipfs/blocks/key"
16 17 18
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
19
	p2ptestutil "gx/ipfs/QmXDvxcXUYn2DDnGKJwdQPxkJgG83jBTp5UmmNzeHzqbj5/go-libp2p/p2p/test/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
24 25
const kNetworkDelay = 0 * time.Millisecond

26
func TestClose(t *testing.T) {
27
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
28
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
53 54
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Jeromy's avatar
Jeromy committed
55 56 57 58 59 60 61
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
62 63
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
65
	block := blocks.NewBlock([]byte("block"))
66
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
67
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69 70
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
71
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

73
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
74 75
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
78
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79

rht's avatar
rht committed
80 81
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
82
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83 84 85 86
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
87 88 89 90

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92
}

93
func TestLargeSwarm(t *testing.T) {
94 95 96
	if testing.Short() {
		t.SkipNow()
	}
97
	numInstances := 100
98
	numBlocks := 2
99 100 101 102
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
103 104
	} else if travis.IsRunning() {
		numInstances = 200
105 106 107
	} else {
		t.Parallel()
	}
108 109
	PerformDistributionTest(t, numInstances, numBlocks)
}
110

111 112 113
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
114
	}
115 116 117 118 119

	if !travis.IsRunning() {
		t.Parallel()
	}

120 121 122
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
123 124
}

125 126 127 128 129 130 131 132 133 134 135 136
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

137 138 139 140 141 142 143 144 145
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

146
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
147
	ctx := context.Background()
148 149 150
	if testing.Short() {
		t.SkipNow()
	}
151
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
152
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
153
	defer sg.Close()
154
	bg := blocksutil.NewBlockGenerator()
155 156 157 158 159 160

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

161 162 163 164 165 166 167 168 169 170 171 172 173
	nump := len(instances) - 1
	// assert we're properly connected
	for _, inst := range instances {
		peers := inst.Exchange.wm.ConnectedPeers()
		for i := 0; i < 10 && len(peers) != nump; i++ {
			time.Sleep(time.Millisecond * 50)
			peers = inst.Exchange.wm.ConnectedPeers()
		}
		if len(peers) != nump {
			t.Fatal("not enough peers connected to instance")
		}
	}

174
	var blkeys []key.Key
175 176
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
177
		blkeys = append(blkeys, b.Key())
178
		first.Exchange.HasBlock(b)
179 180 181 182
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
183
	wg := sync.WaitGroup{}
184 185
	errs := make(chan error)

186
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
187 188 189
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
190
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
191
			if err != nil {
192
				errs <- err
Jeromy's avatar
Jeromy committed
193 194 195 196
			}
			for _ = range outch {
			}
		}(inst)
197
	}
198 199 200 201 202 203 204 205 206 207 208

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
209 210 211 212 213

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
214
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
215 216 217 218 219 220
				t.Fatal(err)
			}
		}
	}
}

221
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
222
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
223
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
224 225 226 227 228 229 230
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

231
// TODO simplify this test. get to the _essence_!
232
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
233 234 235 236
	if testing.Short() {
		t.SkipNow()
	}

237
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
238
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
239
	defer sg.Close()
240
	bg := blocksutil.NewBlockGenerator()
241

Brian Tiger Chow's avatar
Brian Tiger Chow committed
242 243
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
244

245 246 247
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
248

249 250
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
251

252
	waitTime := time.Second * 5
253

254 255
	alpha := bg.Next()
	// peerA requests and waits for block alpha
256
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
257
	defer cancel()
258
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
259
	if err != nil {
260 261
		t.Fatal(err)
	}
262

263
	// peerB announces to the network that he has block alpha
264
	err = peerB.Exchange.HasBlock(alpha)
265
	if err != nil {
266 267
		t.Fatal(err)
	}
268

269 270 271 272
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
273
	}
274

275 276
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
277
	}
278

279
}
Jeromy's avatar
Jeromy committed
280 281

func TestBasicBitswap(t *testing.T) {
282
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
283
	sg := NewTestSessionGenerator(net)
284
	defer sg.Close()
Jeromy's avatar
Jeromy committed
285 286
	bg := blocksutil.NewBlockGenerator()

287
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
288 289 290

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
291
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
292 293 294 295
	if err != nil {
		t.Fatal(err)
	}

296
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
297
	defer cancel()
Jeromy's avatar
Jeromy committed
298 299 300 301 302 303 304 305 306 307 308 309 310
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewTestSessionGenerator(net)
	defer sg.Close()
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)

	ctx1, cancel1 := context.WithCancel(context.Background())

	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
	if err != nil {
		t.Fatal(err)
	}

338 339
	// ensure both requests make it into the wantlist at the same time
	time.Sleep(time.Millisecond * 100)
Jeromy's avatar
Jeromy committed
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	blk, ok := <-blkch2
	if !ok {
		t.Fatal("expected to get the block here")
	}
	t.Log(blk)

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}