bitswap_test.go 5.72 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9 10
	detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
11
	travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
12 13 14 15 16 17 18

	blocks "github.com/ipfs/go-ipfs/blocks"
	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
	u "github.com/ipfs/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
23 24
const kNetworkDelay = 0 * time.Millisecond

25
func TestClose(t *testing.T) {
26
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
27
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
28
	defer sesgen.Close()
29
	bgen := blocksutil.NewBlockGenerator()
30 31 32 33

	block := bgen.Next()
	bitswap := sesgen.Next()

34 35
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
36 37
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
38 39
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

40
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
41
	block := blocks.NewBlock([]byte("block"))
42
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
43
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44

45 46
	peers := g.Instances(2)
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
47
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48

49
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
50 51
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
52

53
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
54
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
57
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
62 63 64 65

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67
}

68
func TestLargeSwarm(t *testing.T) {
69 70 71
	if testing.Short() {
		t.SkipNow()
	}
Jeromy's avatar
Jeromy committed
72
	numInstances := 500
73
	numBlocks := 2
74 75 76 77
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
		numInstances = 100
78 79
	} else if travis.IsRunning() {
		numInstances = 200
80 81 82
	} else {
		t.Parallel()
	}
83 84
	PerformDistributionTest(t, numInstances, numBlocks)
}
85

86 87 88
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
89
	}
90 91 92 93 94

	if !travis.IsRunning() {
		t.Parallel()
	}

95 96 97
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
98 99
}

100
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
101 102 103
	if testing.Short() {
		t.SkipNow()
	}
104
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
105
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
106
	defer sg.Close()
107
	bg := blocksutil.NewBlockGenerator()
108 109 110 111 112 113 114 115

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
116
	var blkeys []u.Key
117 118
	first := instances[0]
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
119
		blkeys = append(blkeys, b.Key())
120
		first.Exchange.HasBlock(context.Background(), b)
121 122 123 124
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
125
	wg := sync.WaitGroup{}
126
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
127 128 129 130 131 132 133 134 135 136
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
137 138 139 140 141 142 143
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
144
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
145 146 147 148 149 150
				t.Fatal(err)
			}
		}
	}
}

151
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
152
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
153
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
154 155 156 157 158 159 160
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

161
// TODO simplify this test. get to the _essence_!
162
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
163 164 165 166
	if testing.Short() {
		t.SkipNow()
	}

167
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
168
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
169
	defer sg.Close()
170
	bg := blocksutil.NewBlockGenerator()
171

Brian Tiger Chow's avatar
Brian Tiger Chow committed
172 173
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
174

175 176 177
	peers := sg.Instances(2)
	peerA := peers[0]
	peerB := peers[1]
178

179 180
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
181

182 183
	timeout := time.Second
	waitTime := time.Second * 5
184

185 186 187 188 189
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
190 191
		t.Fatal(err)
	}
192

193 194 195 196
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
197 198
		t.Fatal(err)
	}
199

200 201 202 203
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
204
	}
205

206 207
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
208
	}
209

210
}
Jeromy's avatar
Jeromy committed
211 212

func TestBasicBitswap(t *testing.T) {
213
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
214
	sg := NewTestSessionGenerator(net)
215
	defer sg.Close()
Jeromy's avatar
Jeromy committed
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}