bitswap_test.go 6.94 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
11
	travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
12 13 14

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
15
	key "github.com/ipfs/go-ipfs/blocks/key"
16
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
Jeromy's avatar
Jeromy committed
17
	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
18 19
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
24 25
const kNetworkDelay = 0 * time.Millisecond

26
func TestClose(t *testing.T) {
27
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
28
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewTestSessionGenerator(net)
	defer g.Close()

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network

	solo := g.Next()
	defer solo.Exchange.Close()

rht's avatar
rht committed
53 54
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Jeromy's avatar
Jeromy committed
55 56 57 58 59 60 61
	_, err := solo.Exchange.GetBlock(ctx, block.Key())

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
62 63
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
65
	block := blocks.NewBlock([]byte("block"))
66
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
67
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69 70
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
71
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

73
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
74 75
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
78
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79

rht's avatar
rht committed
80 81
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
82
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83 84 85 86
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
87 88 89 90

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92
}

93
func TestLargeSwarm(t *testing.T) {
94 95 96
	if testing.Short() {
		t.SkipNow()
	}
97
	numInstances := 100
98
	numBlocks := 2
99 100 101 102
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
103 104
	} else if travis.IsRunning() {
		numInstances = 200
105 106 107
	} else {
		t.Parallel()
	}
108 109
	PerformDistributionTest(t, numInstances, numBlocks)
}
110

111 112 113
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
114
	}
115 116 117 118 119

	if !travis.IsRunning() {
		t.Parallel()
	}

120 121 122
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
123 124
}

125 126 127 128 129 130 131 132 133 134 135 136
func TestLargeFileNoRebroadcast(t *testing.T) {
	rbd := rebroadcastDelay.Get()
	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
	rebroadcastDelay.Set(rbd)
}

137 138 139 140 141 142 143 144 145
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

146
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
147 148 149
	if testing.Short() {
		t.SkipNow()
	}
150
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
151
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
152
	defer sg.Close()
153
	bg := blocksutil.NewBlockGenerator()
154 155 156 157 158 159

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

160
	var blkeys []key.Key
161 162
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
163
		blkeys = append(blkeys, b.Key())
164
		first.Exchange.HasBlock(context.Background(), b)
165 166 167 168
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
169
	wg := sync.WaitGroup{}
170
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
171 172 173 174 175 176 177 178 179 180
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
181 182 183 184 185 186 187
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
188
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
189 190 191 192 193 194
				t.Fatal(err)
			}
		}
	}
}

195
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
196
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
197
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
198 199 200 201 202 203 204
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

205
// TODO simplify this test. get to the _essence_!
206
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
207 208 209 210
	if testing.Short() {
		t.SkipNow()
	}

211
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
212
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
213
	defer sg.Close()
214
	bg := blocksutil.NewBlockGenerator()
215

Brian Tiger Chow's avatar
Brian Tiger Chow committed
216 217
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
218

219 220 221
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
222

223 224
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
225

226 227
	timeout := time.Second
	waitTime := time.Second * 5
228

229 230
	alpha := bg.Next()
	// peerA requests and waits for block alpha
rht's avatar
rht committed
231 232
	ctx, cancel := context.WithTimeout(context.TODO(), waitTime)
	defer cancel()
233
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
234
	if err != nil {
235 236
		t.Fatal(err)
	}
237

238
	// peerB announces to the network that he has block alpha
rht's avatar
rht committed
239 240
	ctx, cancel = context.WithTimeout(context.TODO(), timeout)
	defer cancel()
241 242
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
243 244
		t.Fatal(err)
	}
245

246 247 248 249
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
250
	}
251

252 253
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
254
	}
255

256
}
Jeromy's avatar
Jeromy committed
257 258

func TestBasicBitswap(t *testing.T) {
259
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
260
	sg := NewTestSessionGenerator(net)
261
	defer sg.Close()
Jeromy's avatar
Jeromy committed
262 263
	bg := blocksutil.NewBlockGenerator()

264
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
265 266 267 268 269 270 271 272

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

rht's avatar
rht committed
273 274
	ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
	defer cancel()
Jeromy's avatar
Jeromy committed
275 276 277 278 279 280 281 282 283 284 285 286 287
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}