bitswap_test.go 7.5 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
14
	blockstore "github.com/jbenet/go-ipfs/blockstore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15 16
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
17
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18
	peer "github.com/jbenet/go-ipfs/peer"
19
	mock "github.com/jbenet/go-ipfs/routing/mock"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22 23
)

func TestGetBlockTimeout(t *testing.T) {

24
	net := tn.VirtualNetwork()
25
	rs := mock.VirtualRoutingServer()
26
	g := NewSessionGenerator(net, rs)
27

28
	self := g.Next()
29

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
31
	block := blocks.NewBlock([]byte("block"))
Jeromy's avatar
Jeromy committed
32
	_, err := self.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35 36 37 38 39 40

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

41
	net := tn.VirtualNetwork()
42
	rs := mock.VirtualRoutingServer()
43
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44

45
	block := blocks.NewBlock([]byte("block"))
46
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47

48
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
Jeromy's avatar
Jeromy committed
51
	_, err := solo.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
52

Brian Tiger Chow's avatar
Brian Tiger Chow committed
53 54 55 56 57
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

62
	net := tn.VirtualNetwork()
63
	rs := mock.VirtualRoutingServer()
64
	block := blocks.NewBlock([]byte("block"))
65
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	if err := hasBlock.blockstore.Put(block); err != nil {
70 71
		t.Fatal(err)
	}
72
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
73 74
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75

76
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77 78

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
Jeromy's avatar
Jeromy committed
79
	received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
80 81 82 83
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
84 85 86 87

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88 89
}

90
func TestLargeSwarm(t *testing.T) {
91 92 93
	if testing.Short() {
		t.SkipNow()
	}
94
	t.Parallel()
95
	numInstances := 500
96
	numBlocks := 2
97 98
	PerformDistributionTest(t, numInstances, numBlocks)
}
99

100 101 102
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
103
	}
104 105 106 107
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
108 109
}

110
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
	if testing.Short() {
		t.SkipNow()
	}
	net := tn.VirtualNetwork()
	rs := mock.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

158 159
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
Jeromy's avatar
Jeromy committed
160
		_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
161 162 163 164 165 166 167
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

168
// TODO simplify this test. get to the _essence_!
169
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
170 171 172 173
	if testing.Short() {
		t.SkipNow()
	}

174
	net := tn.VirtualNetwork()
175
	rs := mock.VirtualRoutingServer()
176
	sg := NewSessionGenerator(net, rs)
177
	bg := NewBlockGenerator()
178 179 180 181 182

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

183 184 185
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
186

187 188
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
189
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
190

191
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
192
	ctx, _ := context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
193
	_, err := w.exchange.GetBlock(ctx, alpha.Key())
194
	if err == nil {
195
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
196 197 198
	}

	beta := bg.Next()
199
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
200
	ctx, _ = context.WithTimeout(context.Background(), timeout)
201
	if err := w.blockstore.Put(&beta); err != nil {
202 203
		t.Fatal(err)
	}
204 205
	w.exchange.HasBlock(ctx, beta)

206
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
207
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
208
	if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
209 210
		t.Fatal(err)
	}
211

212
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
213
	ctx, _ = context.WithTimeout(context.Background(), timeout)
214
	if err := o.blockstore.Put(&alpha); err != nil {
215 216
		t.Fatal(err)
	}
217 218
	o.exchange.HasBlock(ctx, alpha)

219
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
220
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
221
	if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
222 223
		t.Fatal(err)
	}
224

225
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
226 227 228 229 230
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
231
		t.Fatal("Expected to receive alpha from me")
232
	}
233 234
}

235 236
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
237 238 239
}

type BlockGenerator struct {
240
	seq int
241 242 243 244
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
245
	return *blocks.NewBlock([]byte(string(bg.seq)))
246 247 248 249 250 251 252 253 254 255 256
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

257
func NewSessionGenerator(
258
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
259 260 261 262 263 264 265 266 267 268
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
269
	rs  mock.RoutingServer
270 271
}

272
func (g *SessionGenerator) Next() instance {
273 274 275 276
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

277 278 279 280 281 282 283 284 285 286
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
287
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
288 289 290 291
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

292 293 294 295 296
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
297
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
298
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
299 300

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
301
	htc := rs.Client(p)
302
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
303

304
	const alwaysSendToPeer = true
305 306 307 308
	ctx := context.TODO()

	bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer)

309
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
310 311
		peer:       p,
		exchange:   bs,
312
		blockstore: bstore,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
313 314
	}
}