bitswap_test.go 8.15 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
14
	blockstore "github.com/jbenet/go-ipfs/blockstore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15 16
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
17
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18
	peer "github.com/jbenet/go-ipfs/peer"
19
	mock "github.com/jbenet/go-ipfs/routing/mock"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22 23
)

func TestGetBlockTimeout(t *testing.T) {

24
	net := tn.VirtualNetwork()
25
	rs := mock.VirtualRoutingServer()
26
	g := NewSessionGenerator(net, rs)
27

28
	self := g.Next()
29

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
31
	block := blocks.NewBlock([]byte("block"))
Jeromy's avatar
Jeromy committed
32
	_, err := self.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35 36 37 38 39 40

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

41
	net := tn.VirtualNetwork()
42
	rs := mock.VirtualRoutingServer()
43
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44

45
	block := blocks.NewBlock([]byte("block"))
46
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47

48
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
Jeromy's avatar
Jeromy committed
51
	_, err := solo.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
52

Brian Tiger Chow's avatar
Brian Tiger Chow committed
53 54 55 56 57
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

62
	net := tn.VirtualNetwork()
63
	rs := mock.VirtualRoutingServer()
64
	block := blocks.NewBlock([]byte("block"))
65
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	if err := hasBlock.blockstore.Put(block); err != nil {
70 71
		t.Fatal(err)
	}
72
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
73 74
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75

76
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77 78

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
Jeromy's avatar
Jeromy committed
79
	received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
80 81 82 83
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
84 85 86 87

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88 89
}

90
func TestSwarm(t *testing.T) {
91 92 93
	if testing.Short() {
		t.SkipNow()
	}
94
	net := tn.VirtualNetwork()
95
	rs := mock.VirtualRoutingServer()
96
	sg := NewSessionGenerator(net, rs)
97
	bg := NewBlockGenerator()
98 99 100

	t.Log("Create a ton of instances, and just a few blocks")

101
	numInstances := 500
102 103 104 105 106 107 108 109 110
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
111
		first.blockstore.Put(b)
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	net := tn.VirtualNetwork()
	rs := mock.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	numInstances := 10
	numBlocks := 100

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

192 193
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
Jeromy's avatar
Jeromy committed
194
		_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
195 196 197 198 199 200 201
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

202
// TODO simplify this test. get to the _essence_!
203
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
204 205 206 207
	if testing.Short() {
		t.SkipNow()
	}

208
	net := tn.VirtualNetwork()
209
	rs := mock.VirtualRoutingServer()
210
	sg := NewSessionGenerator(net, rs)
211
	bg := NewBlockGenerator()
212 213 214 215 216

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

217 218 219
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
220

221 222
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
223
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
224

225
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
226
	ctx, _ := context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
227
	_, err := w.exchange.GetBlock(ctx, alpha.Key())
228
	if err == nil {
229
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
230 231 232
	}

	beta := bg.Next()
233
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
234
	ctx, _ = context.WithTimeout(context.Background(), timeout)
235
	if err := w.blockstore.Put(&beta); err != nil {
236 237
		t.Fatal(err)
	}
238 239
	w.exchange.HasBlock(ctx, beta)

240
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
241
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
242
	if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
243 244
		t.Fatal(err)
	}
245

246
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
247
	ctx, _ = context.WithTimeout(context.Background(), timeout)
248
	if err := o.blockstore.Put(&alpha); err != nil {
249 250
		t.Fatal(err)
	}
251 252
	o.exchange.HasBlock(ctx, alpha)

253
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
254
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
255
	if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
256 257
		t.Fatal(err)
	}
258

259
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
260 261 262 263 264
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
265
		t.Fatal("Expected to receive alpha from me")
266
	}
267 268
}

269 270
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
271 272 273
}

type BlockGenerator struct {
274
	seq int
275 276 277 278
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
279
	return *blocks.NewBlock([]byte(string(bg.seq)))
280 281 282 283 284 285 286 287 288 289 290
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

291
func NewSessionGenerator(
292
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
293 294 295 296 297 298 299 300 301 302
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
303
	rs  mock.RoutingServer
304 305
}

306
func (g *SessionGenerator) Next() instance {
307 308 309 310
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

311 312 313 314 315 316 317 318 319 320
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
321
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
322 323 324 325
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

326 327 328 329 330
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
331
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
332
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
333 334

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
335
	htc := rs.Client(p)
336
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
337

338
	const alwaysSendToPeer = true
339 340 341 342
	ctx := context.TODO()

	bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer)

343
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
344 345
		peer:       p,
		exchange:   bs,
346
		blockstore: bstore,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
347 348
	}
}