bitswap_test.go 6.16 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8
	"testing"
	"time"

9
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
10
	blocks "github.com/jbenet/go-ipfs/blocks"
11
	blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
12
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
13
	p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
14
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
15
	delay "github.com/jbenet/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
16
	u "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17 18
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
21 22
const kNetworkDelay = 0 * time.Millisecond

23
func TestClose(t *testing.T) {
24
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
25
	sesgen := NewTestSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
26
	defer sesgen.Close()
27
	bgen := blocksutil.NewBlockGenerator()
28 29 30 31

	block := bgen.Next()
	bitswap := sesgen.Next()

32 33
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
34 35
}

36
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37

Brian Tiger Chow's avatar
Brian Tiger Chow committed
38
	rs := mockrouting.NewServer()
39
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
40
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
41
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42

43
	block := blocks.NewBlock([]byte("block"))
44
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
45
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46

47
	solo := g.Next()
Jeromy's avatar
Jeromy committed
48
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
51
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
52

Brian Tiger Chow's avatar
Brian Tiger Chow committed
53 54 55 56 57
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

62
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
63
	block := blocks.NewBlock([]byte("block"))
64
	g := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
65
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
68
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69

70
	if err := hasBlock.Blockstore().Put(block); err != nil {
71 72
		t.Fatal(err)
	}
73
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
74 75
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
78
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
81
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84 85
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
86 87 88 89

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91
}

92
func TestLargeSwarm(t *testing.T) {
93 94 95
	if testing.Short() {
		t.SkipNow()
	}
96
	t.Parallel()
Jeromy's avatar
Jeromy committed
97
	numInstances := 500
98
	numBlocks := 2
99 100
	PerformDistributionTest(t, numInstances, numBlocks)
}
101

102 103 104
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
105
	}
106 107 108 109
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
110 111
}

112
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
113 114 115
	if testing.Short() {
		t.SkipNow()
	}
116
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
117
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
118
	defer sg.Close()
119
	bg := blocksutil.NewBlockGenerator()
120 121 122 123 124 125 126 127

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
128
	var blkeys []u.Key
129 130
	first := instances[0]
	for _, b := range blocks {
131
		first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
Jeromy's avatar
Jeromy committed
132
		blkeys = append(blkeys, b.Key())
133
		first.Exchange.HasBlock(context.Background(), b)
134 135 136 137
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
138
	wg := sync.WaitGroup{}
139
	for _, inst := range instances {
Jeromy's avatar
Jeromy committed
140 141 142 143 144 145 146 147 148 149
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
150 151 152 153 154 155 156
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
157
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
158 159 160 161 162 163
				t.Fatal(err)
			}
		}
	}
}

164
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
165
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
166
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
167 168 169 170 171 172 173
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

174
// TODO simplify this test. get to the _essence_!
175
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
176 177 178 179
	if testing.Short() {
		t.SkipNow()
	}

180
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
181
	sg := NewTestSessionGenerator(net)
Jeromy's avatar
Jeromy committed
182
	defer sg.Close()
183
	bg := blocksutil.NewBlockGenerator()
184

Brian Tiger Chow's avatar
Brian Tiger Chow committed
185 186
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
187

188 189
	peerA := sg.Next()
	peerB := sg.Next()
190

191 192
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
193

194 195
	timeout := time.Second
	waitTime := time.Second * 5
196

197 198 199 200 201
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
202 203
		t.Fatal(err)
	}
204

205 206 207 208
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
209 210
		t.Fatal(err)
	}
211

212 213 214 215
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
216
	}
217

218 219
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
220
	}
221

222
}
Jeromy's avatar
Jeromy committed
223 224

func TestBasicBitswap(t *testing.T) {
225
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
226
	sg := NewTestSessionGenerator(net)
227
	defer sg.Close()
Jeromy's avatar
Jeromy committed
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}