bitswap_test.go 7.35 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
14 15 16 17
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
18
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19
	peer "github.com/jbenet/go-ipfs/peer"
20
	mock "github.com/jbenet/go-ipfs/routing/mock"
21
	util "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23 24 25
)

func TestGetBlockTimeout(t *testing.T) {

26
	net := tn.VirtualNetwork()
27
	rs := mock.VirtualRoutingServer()
28
	g := NewSessionGenerator(net, rs)
29

30
	self := g.Next()
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
33
	block := blocks.NewBlock([]byte("block"))
34
	_, err := self.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36 37 38 39 40 41 42

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

43
	net := tn.VirtualNetwork()
44
	rs := mock.VirtualRoutingServer()
45
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46

47
	block := blocks.NewBlock([]byte("block"))
48
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49

50
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52 53 54

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	_, err := solo.exchange.Block(ctx, block.Key())

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57 58 59
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork()
65
	rs := mock.VirtualRoutingServer()
66
	block := blocks.NewBlock([]byte("block"))
67
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
70

71
	if err := hasBlock.blockstore.Put(block); err != nil {
72 73
		t.Fatal(err)
	}
74
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
75 76
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
81
	received, err := wantsBlock.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84 85
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
86 87 88 89

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91
}

92
func TestSwarm(t *testing.T) {
93 94 95
	if testing.Short() {
		t.SkipNow()
	}
96
	net := tn.VirtualNetwork()
97
	rs := mock.VirtualRoutingServer()
98
	sg := NewSessionGenerator(net, rs)
99
	bg := NewBlockGenerator()
100 101 102 103 104 105 106 107 108 109 110 111 112

	t.Log("Create a ton of instances, and just a few blocks")

	numInstances := 500
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
113
		first.blockstore.Put(b)
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
		_, err := bitswap.exchange.Block(context.Background(), b.Key())
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

153
// TODO simplify this test. get to the _essence_!
154
func TestSendToWantingPeer(t *testing.T) {
155
	net := tn.VirtualNetwork()
156
	rs := mock.VirtualRoutingServer()
157
	sg := NewSessionGenerator(net, rs)
158
	bg := NewBlockGenerator()
159 160 161 162 163

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

164 165 166
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
167

168 169
	alpha := bg.Next()

170
	const timeout = 1 * time.Millisecond // FIXME don't depend on time
171

172
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
173 174 175
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	_, err := w.exchange.Block(ctx, alpha.Key())
	if err == nil {
176
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
177 178 179
	}

	beta := bg.Next()
180
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
181
	ctx, _ = context.WithTimeout(context.Background(), timeout)
182
	if err := w.blockstore.Put(&beta); err != nil {
183 184
		t.Fatal(err)
	}
185 186
	w.exchange.HasBlock(ctx, beta)

187
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
188
	ctx, _ = context.WithTimeout(context.Background(), timeout)
189 190 191
	if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
		t.Fatal(err)
	}
192

193
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
194
	ctx, _ = context.WithTimeout(context.Background(), timeout)
195
	if err := o.blockstore.Put(&alpha); err != nil {
196 197
		t.Fatal(err)
	}
198 199
	o.exchange.HasBlock(ctx, alpha)

200
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
201
	ctx, _ = context.WithTimeout(context.Background(), timeout)
202 203 204
	if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
		t.Fatal(err)
	}
205

206
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
207 208 209 210 211
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
212
		t.Fatal("Expected to receive alpha from me")
213
	}
214 215
}

216 217
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
218 219 220
}

type BlockGenerator struct {
221
	seq int
222 223 224 225
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
226
	return *blocks.NewBlock([]byte(string(bg.seq)))
227 228 229 230 231 232 233 234 235 236 237
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

238
func NewSessionGenerator(
239
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
240 241 242 243 244 245 246 247 248 249
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
250
	rs  mock.RoutingServer
251 252
}

253
func (g *SessionGenerator) Next() instance {
254 255 256 257
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

258 259 260 261 262 263 264 265 266 267
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
268
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
269 270 271 272
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

273 274 275 276 277
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
278
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
279
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
280 281

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
282
	htc := rs.Client(p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
283

284
	blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
285
	const alwaysSendToPeer = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
286 287 288
	bs := &bitswap{
		blockstore:    blockstore,
		notifications: notifications.New(),
289
		strategy:      strategy.New(alwaysSendToPeer),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
290 291
		routing:       htc,
		sender:        adapter,
292
		wantlist:      util.NewKeySet(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
293 294
	}
	adapter.SetDelegate(bs)
295
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
296 297 298 299 300
		peer:       p,
		exchange:   bs,
		blockstore: blockstore,
	}
}