bitswap_test.go 6.42 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10 11 12 13 14 15 16 17 18
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
	u "github.com/ipfs/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
23 24
const kNetworkDelay = 0 * time.Millisecond

25
func TestClose(t *testing.T) {
26
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
27
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
28
	defer sesgen.Close()
29
	bgen := blocksutil.NewBlockGenerator()
30 31 32 33

	block := bgen.Next()
	bitswap := sesgen.Next()

34 35
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
36 37
}

38
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
Brian Tiger Chow's avatar
Brian Tiger Chow committed
39

Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	rs := mockrouting.NewServer()
41
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
42
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
43
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44

45
	block := blocks.NewBlock([]byte("block"))
46
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
47
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48

49
	solo := g.Next()
Jeromy's avatar
Jeromy committed
50
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
53
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57 58 59
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
65
	block := blocks.NewBlock([]byte("block"))
66
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
67
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
70
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71

72
	if err := hasBlock.Blockstore().Put(block); err != nil {
73 74
		t.Fatal(err)
	}
75
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
76 77
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78

79
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
80
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
83
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84 85 86 87
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
88 89 90 91

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92 93
}

94
func TestLargeSwarm(t *testing.T) {
95 96 97
	if testing.Short() {
		t.SkipNow()
	}
Jeromy's avatar
Jeromy committed
98
	numInstances := 500
99
	numBlocks := 2
100 101 102 103 104 105 106
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
	} else {
		t.Parallel()
	}
107 108
	PerformDistributionTest(t, numInstances, numBlocks)
}
109

110 111 112
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
113
	}
114 115 116 117
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
118 119
}

120
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
121 122 123
	if testing.Short() {
		t.SkipNow()
	}
124
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
125
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
126
	defer sg.Close()
127
	bg := blocksutil.NewBlockGenerator()
128 129 130 131 132 133 134 135

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
136
	var blkeys []u.Key
137 138
	first := instances[0]
	for _, b := range blocks {
139
		first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
Jeromy's avatar
Jeromy committed
140
		blkeys = append(blkeys, b.Key())
141
		first.Exchange.HasBlock(context.Background(), b)
142 143 144 145
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
146
	wg := sync.WaitGroup{}
147
	for _, inst := range instances {
Jeromy's avatar
Jeromy committed
148 149 150 151 152 153 154 155 156 157
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
158 159 160 161 162 163 164
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
165
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
166 167 168 169 170 171
				t.Fatal(err)
			}
		}
	}
}

172
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
173
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
174
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
175 176 177 178 179 180 181
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

182
// TODO simplify this test. get to the _essence_!
183
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
184 185 186 187
	if testing.Short() {
		t.SkipNow()
	}

188
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
189
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
190
	defer sg.Close()
191
	bg := blocksutil.NewBlockGenerator()
192

Brian Tiger Chow's avatar
Brian Tiger Chow committed
193 194
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
195

196 197
	peerA := sg.Next()
	peerB := sg.Next()
198

199 200
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
201

202 203
	timeout := time.Second
	waitTime := time.Second * 5
204

205 206 207 208 209
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
210 211
		t.Fatal(err)
	}
212

213 214 215 216
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
217 218
		t.Fatal(err)
	}
219

220 221 222 223
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
224
	}
225

226 227
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
228
	}
229

230
}
Jeromy's avatar
Jeromy committed
231 232

func TestBasicBitswap(t *testing.T) {
233
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
234
	sg := NewTestSessionGenerator(net)
235
	defer sg.Close()
Jeromy's avatar
Jeromy committed
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}