bitswap_test.go 7.4 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
10
	blocks "github.com/jbenet/go-ipfs/blocks"
11
	blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
12
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
Jeromy's avatar
Jeromy committed
14
	u "github.com/jbenet/go-ipfs/util"
15
	delay "github.com/jbenet/go-ipfs/util/delay"
16
	testutil "github.com/jbenet/go-ipfs/util/testutil"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17 18
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
21 22
const kNetworkDelay = 0 * time.Millisecond

23 24 25
func TestClose(t *testing.T) {
	// TODO
	t.Skip("TODO Bitswap's Close implementation is a WIP")
26
	vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
27
	rout := mockrouting.NewServer()
28
	sesgen := NewSessionGenerator(vnet, rout)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Stop()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
39 40
func TestGetBlockTimeout(t *testing.T) {

41
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	rs := mockrouting.NewServer()
43
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
44
	defer g.Stop()
45

46
	self := g.Next()
47

Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
49
	block := blocks.NewBlock([]byte("block"))
50
	_, err := self.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52 53 54 55 56 57 58

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

59
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
60
	rs := mockrouting.NewServer()
61
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
62
	defer g.Stop()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63

64
	block := blocks.NewBlock([]byte("block"))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
	rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	solo := g.Next()
Jeromy's avatar
Jeromy committed
68
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69 70

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
71
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

Brian Tiger Chow's avatar
Brian Tiger Chow committed
73 74 75 76 77
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79 80 81
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

82
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83
	rs := mockrouting.NewServer()
84
	block := blocks.NewBlock([]byte("block"))
85
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
86
	defer g.Stop()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87

88
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
89
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90

91
	if err := hasBlock.Blockstore().Put(block); err != nil {
92 93
		t.Fatal(err)
	}
94
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
95 96
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97

98
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
99
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
100 101

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
102
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
103 104 105 106
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
107 108 109 110

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112
}

113
func TestLargeSwarm(t *testing.T) {
114 115 116
	if testing.Short() {
		t.SkipNow()
	}
117
	t.Parallel()
Jeromy's avatar
Jeromy committed
118
	numInstances := 500
119
	numBlocks := 2
120 121
	PerformDistributionTest(t, numInstances, numBlocks)
}
122

123 124 125
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
126
	}
127 128 129 130
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
131 132
}

133
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
134 135 136
	if testing.Short() {
		t.SkipNow()
	}
137
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138
	rs := mockrouting.NewServer()
139
	sg := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
140
	defer sg.Stop()
141
	bg := blocksutil.NewBlockGenerator()
142 143 144 145 146 147 148 149

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
150
	var blkeys []u.Key
151 152
	first := instances[0]
	for _, b := range blocks {
153
		first.Blockstore().Put(b)
Jeromy's avatar
Jeromy committed
154
		blkeys = append(blkeys, b.Key())
155
		first.Exchange.HasBlock(context.Background(), b)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
156
		rs.Client(first.Peer).Provide(context.Background(), b.Key())
157 158 159 160
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
161
	wg := sync.WaitGroup{}
162
	for _, inst := range instances {
Jeromy's avatar
Jeromy committed
163 164 165 166 167 168 169 170 171 172
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
173 174 175 176 177 178 179
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
180
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
181 182 183 184 185 186
				t.Fatal(err)
			}
		}
	}
}

187
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
188
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
189
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
190 191 192 193 194 195 196
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

197
// TODO simplify this test. get to the _essence_!
198
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
199 200 201 202
	if testing.Short() {
		t.SkipNow()
	}

203
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
204
	rs := mockrouting.NewServer()
205
	sg := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
206
	defer sg.Stop()
207
	bg := blocksutil.NewBlockGenerator()
208 209 210 211 212

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

213 214 215
	t.Logf("Session %v\n", me.Peer)
	t.Logf("Session %v\n", w.Peer)
	t.Logf("Session %v\n", o.Peer)
216

217 218
	alpha := bg.Next()

Jeromy's avatar
Jeromy committed
219
	const timeout = 1000 * time.Millisecond // FIXME don't depend on time
220

221
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key())
222
	ctx, _ := context.WithTimeout(context.Background(), timeout)
223
	_, err := w.Exchange.GetBlock(ctx, alpha.Key())
224
	if err == nil {
225
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
226 227 228
	}

	beta := bg.Next()
229
	t.Logf("Peer %v announes availability  of %v\n", w.Peer, beta.Key())
230
	ctx, _ = context.WithTimeout(context.Background(), timeout)
231
	if err := w.Blockstore().Put(beta); err != nil {
232 233
		t.Fatal(err)
	}
234
	w.Exchange.HasBlock(ctx, beta)
235

236
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.Peer, beta.Key(), w.Peer, alpha.Key())
237
	ctx, _ = context.WithTimeout(context.Background(), timeout)
238
	if _, err := me.Exchange.GetBlock(ctx, beta.Key()); err != nil {
239 240
		t.Fatal(err)
	}
241

242
	t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key())
243
	ctx, _ = context.WithTimeout(context.Background(), timeout)
244
	if err := o.Blockstore().Put(alpha); err != nil {
245 246
		t.Fatal(err)
	}
247
	o.Exchange.HasBlock(ctx, alpha)
248

249
	t.Logf("%v requests %v\n", me.Peer, alpha.Key())
250
	ctx, _ = context.WithTimeout(context.Background(), timeout)
251
	if _, err := me.Exchange.GetBlock(ctx, alpha.Key()); err != nil {
252 253
		t.Fatal(err)
	}
254

255
	t.Logf("%v should now have %v\n", w.Peer, alpha.Key())
256
	block, err := w.Blockstore().Get(alpha.Key())
257
	if err != nil {
258
		t.Fatalf("Should not have received an error: %s", err)
259 260
	}
	if block.Key() != alpha.Key() {
261
		t.Fatal("Expected to receive alpha from me")
262
	}
263
}
Jeromy's avatar
Jeromy committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293

func TestBasicBitswap(t *testing.T) {
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
	rs := mockrouting.NewServer()
	sg := NewSessionGenerator(net, rs)
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}