bitswap_test.go 7.46 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
rename  
Brian Tiger Chow committed
14
	blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	exchange "github.com/jbenet/go-ipfs/exchange"
16
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17
	peer "github.com/jbenet/go-ipfs/peer"
18
	mock "github.com/jbenet/go-ipfs/routing/mock"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20 21 22
)

func TestGetBlockTimeout(t *testing.T) {

23
	net := tn.VirtualNetwork()
24
	rs := mock.VirtualRoutingServer()
25
	g := NewSessionGenerator(net, rs)
26

27
	self := g.Next()
28

Brian Tiger Chow's avatar
Brian Tiger Chow committed
29
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
30
	block := blocks.NewBlock([]byte("block"))
Jeromy's avatar
Jeromy committed
31
	_, err := self.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
32 33 34 35 36 37 38 39

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

40
	net := tn.VirtualNetwork()
41
	rs := mock.VirtualRoutingServer()
42
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43

44
	block := blocks.NewBlock([]byte("block"))
45
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46

47
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48 49

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
Jeromy's avatar
Jeromy committed
50
	_, err := solo.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51

Brian Tiger Chow's avatar
Brian Tiger Chow committed
52 53 54 55 56
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
57 58 59 60
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

61
	net := tn.VirtualNetwork()
62
	rs := mock.VirtualRoutingServer()
63
	block := blocks.NewBlock([]byte("block"))
64
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65

66
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
67

68
	if err := hasBlock.blockstore.Put(block); err != nil {
69 70
		t.Fatal(err)
	}
71
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
72 73
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
74

75
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76 77

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
Jeromy's avatar
Jeromy committed
78
	received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80 81 82
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
83 84 85 86

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87 88
}

89
func TestLargeSwarm(t *testing.T) {
90 91 92
	if testing.Short() {
		t.SkipNow()
	}
93
	t.Parallel()
94
	numInstances := 500
95
	numBlocks := 2
96 97
	PerformDistributionTest(t, numInstances, numBlocks)
}
98

99 100 101
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
102
	}
103 104 105 106
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
107 108
}

109
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
	if testing.Short() {
		t.SkipNow()
	}
	net := tn.VirtualNetwork()
	rs := mock.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

157 158
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
Jeromy's avatar
Jeromy committed
159
		_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
160 161 162 163 164 165 166
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

167
// TODO simplify this test. get to the _essence_!
168
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
169 170 171 172
	if testing.Short() {
		t.SkipNow()
	}

173
	net := tn.VirtualNetwork()
174
	rs := mock.VirtualRoutingServer()
175
	sg := NewSessionGenerator(net, rs)
176
	bg := NewBlockGenerator()
177 178 179 180 181

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

182 183 184
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
185

186 187
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
188
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
189

190
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
191
	ctx, _ := context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
192
	_, err := w.exchange.GetBlock(ctx, alpha.Key())
193
	if err == nil {
194
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
195 196 197
	}

	beta := bg.Next()
198
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
199
	ctx, _ = context.WithTimeout(context.Background(), timeout)
200
	if err := w.blockstore.Put(&beta); err != nil {
201 202
		t.Fatal(err)
	}
203 204
	w.exchange.HasBlock(ctx, beta)

205
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
206
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
207
	if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
208 209
		t.Fatal(err)
	}
210

211
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
212
	ctx, _ = context.WithTimeout(context.Background(), timeout)
213
	if err := o.blockstore.Put(&alpha); err != nil {
214 215
		t.Fatal(err)
	}
216 217
	o.exchange.HasBlock(ctx, alpha)

218
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
219
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
220
	if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
221 222
		t.Fatal(err)
	}
223

224
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
225 226 227 228 229
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
230
		t.Fatal("Expected to receive alpha from me")
231
	}
232 233
}

234 235
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
236 237 238
}

type BlockGenerator struct {
239
	seq int
240 241 242 243
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
244
	return *blocks.NewBlock([]byte(string(bg.seq)))
245 246 247 248 249 250 251 252 253 254 255
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

256
func NewSessionGenerator(
257
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
258 259 260 261 262 263 264 265 266 267
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
268
	rs  mock.RoutingServer
269 270
}

271
func (g *SessionGenerator) Next() instance {
272 273 274 275
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

276 277 278 279 280 281 282 283 284 285
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
286
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
287
	exchange   exchange.Interface
Brian Tiger Chow's avatar
rename  
Brian Tiger Chow committed
288
	blockstore blockstore.Blockstore
Brian Tiger Chow's avatar
Brian Tiger Chow committed
289 290
}

291 292 293 294 295
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
296
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
297
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
298 299

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
300
	htc := rs.Client(p)
301
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
302

303
	const alwaysSendToPeer = true
304 305 306 307
	ctx := context.TODO()

	bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer)

308
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
309 310
		peer:       p,
		exchange:   bs,
311
		blockstore: bstore,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
312 313
	}
}