bitswap_test.go 6.84 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
11
	travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
12 13 14

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
15
	key "github.com/ipfs/go-ipfs/blocks/key"
16
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
Jeromy's avatar
Jeromy committed
17
	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
18 19
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
24 25
const kNetworkDelay = 0 * time.Millisecond

26
func TestClose(t *testing.T) {
27
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
28
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
61 62
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

63
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
64
	block := blocks.NewBlock([]byte("block"))
65
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
66
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
67

68 69
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
70
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71

72
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
73 74
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75

76
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
77
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
80
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82 83 84
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
85 86 87 88

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90
}

91
func TestLargeSwarm(t *testing.T) {
92 93 94
	if testing.Short() {
		t.SkipNow()
	}
95
	numInstances := 100
96
	numBlocks := 2
97 98 99 100
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
101 102
	} else if travis.IsRunning() {
		numInstances = 200
103 104 105
	} else {
		t.Parallel()
	}
106 107
	PerformDistributionTest(t, numInstances, numBlocks)
}
108

109 110 111
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
112
	}
113 114 115 116 117

	if !travis.IsRunning() {
		t.Parallel()
	}

118 119 120
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
121 122
}

123 124 125 126 127 128 129 130 131 132 133 134
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

135 136 137 138 139 140 141 142 143
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

144
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
145 146 147
	if testing.Short() {
		t.SkipNow()
	}
148
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
149
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
150
	defer sg.Close()
151
	bg := blocksutil.NewBlockGenerator()
152 153 154 155 156 157

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

158
	var blkeys []key.Key
159 160
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
161
		blkeys = append(blkeys, b.Key())
162
		first.Exchange.HasBlock(context.Background(), b)
163 164 165 166
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
167
	wg := sync.WaitGroup{}
168
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
169 170 171 172 173 174 175 176 177 178
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
179 180 181 182 183 184 185
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
186
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
187 188 189 190 191 192
				t.Fatal(err)
			}
		}
	}
}

193
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
194
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
195
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
196 197 198 199 200 201 202
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

203
// TODO simplify this test. get to the _essence_!
204
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
205 206 207 208
	if testing.Short() {
		t.SkipNow()
	}

209
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
210
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
211
	defer sg.Close()
212
	bg := blocksutil.NewBlockGenerator()
213

Brian Tiger Chow's avatar
Brian Tiger Chow committed
214 215
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
216

217 218 219
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
220

221 222
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
223

224 225
	timeout := time.Second
	waitTime := time.Second * 5
226

227 228 229
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
230
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
231
	if err != nil {
232 233
		t.Fatal(err)
	}
234

235 236 237 238
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
239 240
		t.Fatal(err)
	}
241

242 243 244 245
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
246
	}
247

248 249
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
250
	}
251

252
}
Jeromy's avatar
Jeromy committed
253 254

func TestBasicBitswap(t *testing.T) {
255
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
256
	sg := NewTestSessionGenerator(net)
257
	defer sg.Close()
Jeromy's avatar
Jeromy committed
258 259
	bg := blocksutil.NewBlockGenerator()

260
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}