bitswap_test.go 6.53 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
11
	travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
12 13 14 15

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
Jeromy's avatar
Jeromy committed
16
	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
17 18 19
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
	u "github.com/ipfs/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
24 25
const kNetworkDelay = 0 * time.Millisecond

26
func TestClose(t *testing.T) {
27
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
28
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
61 62
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

63
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
64
	block := blocks.NewBlock([]byte("block"))
65
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
66
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
67

68 69
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
70
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71

72
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
73 74
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75

76
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
77
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
80
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82 83 84
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
85 86 87 88

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90
}

91
func TestLargeSwarm(t *testing.T) {
92 93 94
	if testing.Short() {
		t.SkipNow()
	}
Jeromy's avatar
Jeromy committed
95
	numInstances := 500
96
	numBlocks := 2
97 98 99 100
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
101 102
	} else if travis.IsRunning() {
		numInstances = 200
103 104 105
	} else {
		t.Parallel()
	}
106 107
	PerformDistributionTest(t, numInstances, numBlocks)
}
108

109 110 111
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
112
	}
113 114 115 116 117

	if !travis.IsRunning() {
		t.Parallel()
	}

118 119 120
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
121 122
}

123 124 125 126 127 128 129 130 131 132
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	t.Parallel()
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

133
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
134 135 136
	if testing.Short() {
		t.SkipNow()
	}
137
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
138
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
139
	defer sg.Close()
140
	bg := blocksutil.NewBlockGenerator()
141 142 143 144 145 146

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
147
	var blkeys []u.Key
148 149
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
150
		blkeys = append(blkeys, b.Key())
151
		first.Exchange.HasBlock(context.Background(), b)
152 153 154 155
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
156
	wg := sync.WaitGroup{}
157
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
158 159 160 161 162 163 164 165 166 167
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
168 169 170 171 172 173 174
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
175
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
176 177 178 179 180 181
				t.Fatal(err)
			}
		}
	}
}

182
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
183
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
184
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
185 186 187 188 189 190 191
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

192
// TODO simplify this test. get to the _essence_!
193
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
194 195 196 197
	if testing.Short() {
		t.SkipNow()
	}

198
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
199
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
200
	defer sg.Close()
201
	bg := blocksutil.NewBlockGenerator()
202

Brian Tiger Chow's avatar
Brian Tiger Chow committed
203 204
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
205

206 207 208
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
209

210 211
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
212

213 214
	timeout := time.Second
	waitTime := time.Second * 5
215

216 217 218 219 220
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
221 222
		t.Fatal(err)
	}
223

224 225 226 227
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
228 229
		t.Fatal(err)
	}
230

231 232 233 234
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
235
	}
236

237 238
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
239
	}
240

241
}
Jeromy's avatar
Jeromy committed
242 243

func TestBasicBitswap(t *testing.T) {
244
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
245
	sg := NewTestSessionGenerator(net)
246
	defer sg.Close()
Jeromy's avatar
Jeromy committed
247 248
	bg := blocksutil.NewBlockGenerator()

249
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}