bitswap_test.go 6.41 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
11
	travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
12 13 14 15 16 17 18 19

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
	u "github.com/ipfs/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
24 25
const kNetworkDelay = 0 * time.Millisecond

26
func TestClose(t *testing.T) {
27
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
28
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

39
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40

Brian Tiger Chow's avatar
Brian Tiger Chow committed
41
	rs := mockrouting.NewServer()
42
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
43
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
44
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
45

46
	block := blocks.NewBlock([]byte("block"))
47
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
48
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49

50
	solo := g.Next()
Jeromy's avatar
Jeromy committed
51
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
52 53

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
54
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
55

Brian Tiger Chow's avatar
Brian Tiger Chow committed
56 57 58 59 60
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
61 62 63 64
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

65
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
66
	block := blocks.NewBlock([]byte("block"))
67
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
68
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69

70
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
71
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

73
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
74 75
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
78
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
81
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84 85
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
86 87 88 89

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91
}

92
func TestLargeSwarm(t *testing.T) {
93 94 95
	if testing.Short() {
		t.SkipNow()
	}
Jeromy's avatar
Jeromy committed
96
	numInstances := 500
97
	numBlocks := 2
98 99 100 101
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
102 103
	} else if travis.IsRunning() {
		numInstances = 200
104 105 106
	} else {
		t.Parallel()
	}
107 108
	PerformDistributionTest(t, numInstances, numBlocks)
}
109

110 111 112
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
113
	}
114 115 116 117 118

	if !travis.IsRunning() {
		t.Parallel()
	}

119 120 121
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
122 123
}

124
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
125 126 127
	if testing.Short() {
		t.SkipNow()
	}
128
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
129
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
130
	defer sg.Close()
131
	bg := blocksutil.NewBlockGenerator()
132 133 134 135 136 137 138 139

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
140
	var blkeys []u.Key
141 142
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
143
		blkeys = append(blkeys, b.Key())
144
		first.Exchange.HasBlock(context.Background(), b)
145 146 147 148
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
149
	wg := sync.WaitGroup{}
150
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
151 152 153 154 155 156 157 158 159 160
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
161 162 163 164 165 166 167
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
168
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
169 170 171 172 173 174
				t.Fatal(err)
			}
		}
	}
}

175
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
176
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
177
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
178 179 180 181 182 183 184
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

185
// TODO simplify this test. get to the _essence_!
186
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
187 188 189 190
	if testing.Short() {
		t.SkipNow()
	}

191
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
192
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
193
	defer sg.Close()
194
	bg := blocksutil.NewBlockGenerator()
195

Brian Tiger Chow's avatar
Brian Tiger Chow committed
196 197
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
198

199 200
	peerA := sg.Next()
	peerB := sg.Next()
201

202 203
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
204

205 206
	timeout := time.Second
	waitTime := time.Second * 5
207

208 209 210 211 212
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
213 214
		t.Fatal(err)
	}
215

216 217 218 219
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
220 221
		t.Fatal(err)
	}
222

223 224 225 226
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
227
	}
228

229 230
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
231
	}
232

233
}
Jeromy's avatar
Jeromy committed
234 235

func TestBasicBitswap(t *testing.T) {
236
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
237
	sg := NewTestSessionGenerator(net)
238
	defer sg.Close()
Jeromy's avatar
Jeromy committed
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}