bitswap_test.go 8.5 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
14 15 16 17
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
18
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19
	peer "github.com/jbenet/go-ipfs/peer"
20
	mock "github.com/jbenet/go-ipfs/routing/mock"
21
	util "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23 24 25
)

func TestGetBlockTimeout(t *testing.T) {

26
	net := tn.VirtualNetwork()
27
	rs := mock.VirtualRoutingServer()
28
	g := NewSessionGenerator(net, rs)
29

30
	self := g.Next()
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
33
	block := blocks.NewBlock([]byte("block"))
Jeromy's avatar
Jeromy committed
34
	_, err := self.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36 37 38 39 40 41 42

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

43
	net := tn.VirtualNetwork()
44
	rs := mock.VirtualRoutingServer()
45
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46

47
	block := blocks.NewBlock([]byte("block"))
48
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49

50
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
Jeromy's avatar
Jeromy committed
53
	_, err := solo.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57 58 59
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork()
65
	rs := mock.VirtualRoutingServer()
66
	block := blocks.NewBlock([]byte("block"))
67
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
70

71
	if err := hasBlock.blockstore.Put(block); err != nil {
72 73
		t.Fatal(err)
	}
74
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
75 76
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
Jeromy's avatar
Jeromy committed
81
	received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84 85
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
86 87 88 89

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91
}

92
func TestSwarm(t *testing.T) {
93 94 95
	if testing.Short() {
		t.SkipNow()
	}
96
	net := tn.VirtualNetwork()
97
	rs := mock.VirtualRoutingServer()
98
	sg := NewSessionGenerator(net, rs)
99
	bg := NewBlockGenerator()
100 101 102

	t.Log("Create a ton of instances, and just a few blocks")

103
	numInstances := 500
104 105 106 107 108 109 110 111 112
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
113
		first.blockstore.Put(b)
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	net := tn.VirtualNetwork()
	rs := mock.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	numInstances := 10
	numBlocks := 100

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

194 195
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
Jeromy's avatar
Jeromy committed
196
		_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
197 198 199 200 201 202 203
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

204
// TODO simplify this test. get to the _essence_!
205
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
206 207 208 209
	if testing.Short() {
		t.SkipNow()
	}

210
	net := tn.VirtualNetwork()
211
	rs := mock.VirtualRoutingServer()
212
	sg := NewSessionGenerator(net, rs)
213
	bg := NewBlockGenerator()
214 215 216 217 218

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

219 220 221
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
222

223 224
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
225
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
226

227
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
228
	ctx, _ := context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
229
	_, err := w.exchange.GetBlock(ctx, alpha.Key())
230
	if err == nil {
231
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
232 233 234
	}

	beta := bg.Next()
235
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
236
	ctx, _ = context.WithTimeout(context.Background(), timeout)
237
	if err := w.blockstore.Put(&beta); err != nil {
238 239
		t.Fatal(err)
	}
240 241
	w.exchange.HasBlock(ctx, beta)

242
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
243
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
244
	if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
245 246
		t.Fatal(err)
	}
247

248
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
249
	ctx, _ = context.WithTimeout(context.Background(), timeout)
250
	if err := o.blockstore.Put(&alpha); err != nil {
251 252
		t.Fatal(err)
	}
253 254
	o.exchange.HasBlock(ctx, alpha)

255
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
256
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
257
	if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
258 259
		t.Fatal(err)
	}
260

261
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
262 263 264 265 266
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
267
		t.Fatal("Expected to receive alpha from me")
268
	}
269 270
}

271 272
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
273 274 275
}

type BlockGenerator struct {
276
	seq int
277 278 279 280
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
281
	return *blocks.NewBlock([]byte(string(bg.seq)))
282 283 284 285 286 287 288 289 290 291 292
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

293
func NewSessionGenerator(
294
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
295 296 297 298 299 300 301 302 303 304
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
305
	rs  mock.RoutingServer
306 307
}

308
func (g *SessionGenerator) Next() instance {
309 310 311 312
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

313 314 315 316 317 318 319 320 321 322
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
323
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
324 325 326 327
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

328 329 330 331 332
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
333
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
334
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
335 336

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
337
	htc := rs.Client(p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
338

339
	blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
340
	const alwaysSendToPeer = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
341 342 343
	bs := &bitswap{
		blockstore:    blockstore,
		notifications: notifications.New(),
344
		strategy:      strategy.New(alwaysSendToPeer),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
345 346
		routing:       htc,
		sender:        adapter,
347
		wantlist:      util.NewKeySet(),
348
		batchRequests: make(chan []util.Key, 32),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
349 350
	}
	adapter.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
351
	go bs.run(context.TODO())
352
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
353 354 355 356 357
		peer:       p,
		exchange:   bs,
		blockstore: blockstore,
	}
}