bitswap_test.go 7.83 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
rename  
Brian Tiger Chow committed
14
	blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	exchange "github.com/jbenet/go-ipfs/exchange"
16
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17
	peer "github.com/jbenet/go-ipfs/peer"
18
	mock "github.com/jbenet/go-ipfs/routing/mock"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
)

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
func TestClose(t *testing.T) {
	// TODO
	t.Skip("TODO Bitswap's Close implementation is a WIP")
	vnet := tn.VirtualNetwork()
	rout := mock.VirtualRoutingServer()
	sesgen := NewSessionGenerator(vnet, rout)
	bgen := NewBlockGenerator()

	block := bgen.Next()
	bitswap := sesgen.Next()

	bitswap.exchange.Close()
	bitswap.exchange.GetBlock(context.Background(), block.Key())
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
36 37
func TestGetBlockTimeout(t *testing.T) {

38
	net := tn.VirtualNetwork()
39
	rs := mock.VirtualRoutingServer()
40
	g := NewSessionGenerator(net, rs)
41

42
	self := g.Next()
43

Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
45
	block := blocks.NewBlock([]byte("block"))
Jeromy's avatar
Jeromy committed
46
	_, err := self.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47 48 49 50 51 52 53 54

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

55
	net := tn.VirtualNetwork()
56
	rs := mock.VirtualRoutingServer()
57
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58

59
	block := blocks.NewBlock([]byte("block"))
60
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
61

62
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
Jeromy's avatar
Jeromy committed
65
	_, err := solo.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

Brian Tiger Chow's avatar
Brian Tiger Chow committed
67 68 69 70 71
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
72 73 74 75
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

76
	net := tn.VirtualNetwork()
77
	rs := mock.VirtualRoutingServer()
78
	block := blocks.NewBlock([]byte("block"))
79
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
80

81
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82

83
	if err := hasBlock.blockstore.Put(block); err != nil {
84 85
		t.Fatal(err)
	}
86
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
87 88
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89

90
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
Jeromy's avatar
Jeromy committed
93
	received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94 95 96 97
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
98 99 100 101

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103
}

104
func TestLargeSwarm(t *testing.T) {
105 106 107
	if testing.Short() {
		t.SkipNow()
	}
108
	t.Parallel()
109
	numInstances := 500
110
	numBlocks := 2
111 112
	PerformDistributionTest(t, numInstances, numBlocks)
}
113

114 115 116
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
117
	}
118 119 120 121
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
122 123
}

124
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
	if testing.Short() {
		t.SkipNow()
	}
	net := tn.VirtualNetwork()
	rs := mock.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

172 173
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
Jeromy's avatar
Jeromy committed
174
		_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
175 176 177 178 179 180 181
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

182
// TODO simplify this test. get to the _essence_!
183
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
184 185 186 187
	if testing.Short() {
		t.SkipNow()
	}

188
	net := tn.VirtualNetwork()
189
	rs := mock.VirtualRoutingServer()
190
	sg := NewSessionGenerator(net, rs)
191
	bg := NewBlockGenerator()
192 193 194 195 196

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

197 198 199
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
200

201 202
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
203
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
204

205
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
206
	ctx, _ := context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
207
	_, err := w.exchange.GetBlock(ctx, alpha.Key())
208
	if err == nil {
209
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
210 211 212
	}

	beta := bg.Next()
213
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
214
	ctx, _ = context.WithTimeout(context.Background(), timeout)
215
	if err := w.blockstore.Put(&beta); err != nil {
216 217
		t.Fatal(err)
	}
218 219
	w.exchange.HasBlock(ctx, beta)

220
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
221
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
222
	if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
223 224
		t.Fatal(err)
	}
225

226
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
227
	ctx, _ = context.WithTimeout(context.Background(), timeout)
228
	if err := o.blockstore.Put(&alpha); err != nil {
229 230
		t.Fatal(err)
	}
231 232
	o.exchange.HasBlock(ctx, alpha)

233
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
234
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
235
	if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
236 237
		t.Fatal(err)
	}
238

239
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
240 241 242 243 244
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
245
		t.Fatal("Expected to receive alpha from me")
246
	}
247 248
}

249 250
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
251 252 253
}

type BlockGenerator struct {
254
	seq int
255 256 257 258
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
259
	return *blocks.NewBlock([]byte(string(bg.seq)))
260 261 262 263 264 265 266 267 268 269 270
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

271
func NewSessionGenerator(
272
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
273 274 275 276 277 278 279 280 281 282
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
283
	rs  mock.RoutingServer
284 285
}

286
func (g *SessionGenerator) Next() instance {
287 288 289 290
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

291 292 293 294 295 296 297 298 299 300
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
301
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
302
	exchange   exchange.Interface
Brian Tiger Chow's avatar
rename  
Brian Tiger Chow committed
303
	blockstore blockstore.Blockstore
Brian Tiger Chow's avatar
Brian Tiger Chow committed
304 305
}

306 307 308 309 310
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
311
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
312
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
313 314

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
315
	htc := rs.Client(p)
316
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
317

318
	const alwaysSendToPeer = true
319 320 321 322
	ctx := context.TODO()

	bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer)

323
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
324 325
		peer:       p,
		exchange:   bs,
326
		blockstore: bstore,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
327 328
	}
}