bitswap_test.go 7.33 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
10
	travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
Jeromy's avatar
Jeromy committed
11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12 13 14

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
15
	key "github.com/ipfs/go-ipfs/blocks/key"
16 17 18
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
19
	p2ptestutil "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/test/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
24 25
const kNetworkDelay = 0 * time.Millisecond

26
func TestClose(t *testing.T) {
27
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
28
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
53 54
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Jeromy's avatar
Jeromy committed
55 56 57 58 59 60 61
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
62 63
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
65
	block := blocks.NewBlock([]byte("block"))
66
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
67
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69 70
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
71
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

73
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
74 75
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
78
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79

rht's avatar
rht committed
80 81
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
82
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83 84 85 86
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
87 88 89 90

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92
}

93
func TestLargeSwarm(t *testing.T) {
94 95 96
	if testing.Short() {
		t.SkipNow()
	}
97
	numInstances := 100
98
	numBlocks := 2
99 100 101 102
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
103 104
	} else if travis.IsRunning() {
		numInstances = 200
105 106 107
	} else {
		t.Parallel()
	}
108 109
	PerformDistributionTest(t, numInstances, numBlocks)
}
110

111 112 113
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
114
	}
115 116 117 118 119

	if !travis.IsRunning() {
		t.Parallel()
	}

120 121 122
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
123 124
}

125 126 127 128 129 130 131 132 133 134 135 136
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

137 138 139 140 141 142 143 144 145
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

146
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
147
	ctx := context.Background()
148 149 150
	if testing.Short() {
		t.SkipNow()
	}
151
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
152
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
153
	defer sg.Close()
154
	bg := blocksutil.NewBlockGenerator()
155 156 157 158 159 160

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

161 162 163 164 165 166 167 168 169 170 171 172 173
	nump := len(instances) - 1
	// assert we're properly connected
	for _, inst := range instances {
		peers := inst.Exchange.wm.ConnectedPeers()
		for i := 0; i < 10 && len(peers) != nump; i++ {
			time.Sleep(time.Millisecond * 50)
			peers = inst.Exchange.wm.ConnectedPeers()
		}
		if len(peers) != nump {
			t.Fatal("not enough peers connected to instance")
		}
	}

174
	var blkeys []key.Key
175 176
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
177
		blkeys = append(blkeys, b.Key())
178
		first.Exchange.HasBlock(b)
179 180 181 182
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
183
	wg := sync.WaitGroup{}
184 185
	errs := make(chan error)

186
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
187 188 189
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
190
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
191
			if err != nil {
192
				errs <- err
Jeromy's avatar
Jeromy committed
193 194 195 196
			}
			for _ = range outch {
			}
		}(inst)
197
	}
198 199 200 201 202 203 204 205 206 207 208

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
209 210 211 212 213

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
214
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
215 216 217 218 219 220
				t.Fatal(err)
			}
		}
	}
}

221
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
222
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
223
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
224 225 226 227 228 229 230
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

231
// TODO simplify this test. get to the _essence_!
232
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
233 234 235 236
	if testing.Short() {
		t.SkipNow()
	}

237
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
238
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
239
	defer sg.Close()
240
	bg := blocksutil.NewBlockGenerator()
241

Brian Tiger Chow's avatar
Brian Tiger Chow committed
242 243
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
244

245 246 247
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
248

249 250
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
251

252
	waitTime := time.Second * 5
253

254 255
	alpha := bg.Next()
	// peerA requests and waits for block alpha
256
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
257
	defer cancel()
258
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
259
	if err != nil {
260 261
		t.Fatal(err)
	}
262

263
	// peerB announces to the network that he has block alpha
264
	err = peerB.Exchange.HasBlock(alpha)
265
	if err != nil {
266 267
		t.Fatal(err)
	}
268

269 270 271 272
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
273
	}
274

275 276
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
277
	}
278

279
}
Jeromy's avatar
Jeromy committed
280 281

func TestBasicBitswap(t *testing.T) {
282
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
283
	sg := NewTestSessionGenerator(net)
284
	defer sg.Close()
Jeromy's avatar
Jeromy committed
285 286
	bg := blocksutil.NewBlockGenerator()

287
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
288 289 290

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
291
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
292 293 294 295
	if err != nil {
		t.Fatal(err)
	}

296
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
297
	defer cancel()
Jeromy's avatar
Jeromy committed
298 299 300 301 302 303 304 305 306 307 308 309 310
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}