bitswap_test.go 6.27 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10 11 12 13 14 15 16 17 18
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
	u "github.com/ipfs/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
23 24
const kNetworkDelay = 0 * time.Millisecond

25
func TestClose(t *testing.T) {
26
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
27
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
28
	defer sesgen.Close()
29
	bgen := blocksutil.NewBlockGenerator()
30 31 32 33

	block := bgen.Next()
	bitswap := sesgen.Next()

34 35
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
36 37
}

38
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
Brian Tiger Chow's avatar
Brian Tiger Chow committed
39

Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	rs := mockrouting.NewServer()
41
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
42
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
43
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44

45
	block := blocks.NewBlock([]byte("block"))
46
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
47
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48

49
	solo := g.Next()
Jeromy's avatar
Jeromy committed
50
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
53
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57 58 59
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
65
	block := blocks.NewBlock([]byte("block"))
66
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
67
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
70
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71

72
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
73 74
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75

76
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
77
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
80
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82 83 84
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
85 86 87 88

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90
}

91
func TestLargeSwarm(t *testing.T) {
92 93 94
	if testing.Short() {
		t.SkipNow()
	}
Jeromy's avatar
Jeromy committed
95
	numInstances := 500
96
	numBlocks := 2
97 98 99 100 101 102 103
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
	} else {
		t.Parallel()
	}
104 105
	PerformDistributionTest(t, numInstances, numBlocks)
}
106

107 108 109
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
110
	}
111 112 113 114
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
115 116
}

117
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
118 119 120
	if testing.Short() {
		t.SkipNow()
	}
121
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
122
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
123
	defer sg.Close()
124
	bg := blocksutil.NewBlockGenerator()
125 126 127 128 129 130 131 132

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
133
	var blkeys []u.Key
134 135
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
136
		blkeys = append(blkeys, b.Key())
137
		first.Exchange.HasBlock(context.Background(), b)
138 139 140 141
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
142
	wg := sync.WaitGroup{}
143
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
144 145 146 147 148 149 150 151 152 153
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
154 155 156 157 158 159 160
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
161
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
162 163 164 165 166 167
				t.Fatal(err)
			}
		}
	}
}

168
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
169
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
170
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
171 172 173 174 175 176 177
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

178
// TODO simplify this test. get to the _essence_!
179
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
180 181 182 183
	if testing.Short() {
		t.SkipNow()
	}

184
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
185
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
186
	defer sg.Close()
187
	bg := blocksutil.NewBlockGenerator()
188

Brian Tiger Chow's avatar
Brian Tiger Chow committed
189 190
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
191

192 193
	peerA := sg.Next()
	peerB := sg.Next()
194

195 196
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
197

198 199
	timeout := time.Second
	waitTime := time.Second * 5
200

201 202 203 204 205
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
206 207
		t.Fatal(err)
	}
208

209 210 211 212
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
213 214
		t.Fatal(err)
	}
215

216 217 218 219
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
220
	}
221

222 223
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
224
	}
225

226
}
Jeromy's avatar
Jeromy committed
227 228

func TestBasicBitswap(t *testing.T) {
229
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
230
	sg := NewTestSessionGenerator(net)
231
	defer sg.Close()
Jeromy's avatar
Jeromy committed
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}