bitswap_test.go 7.48 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
12
	ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
14 15 16 17
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
18
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19
	peer "github.com/jbenet/go-ipfs/peer"
20
	mock "github.com/jbenet/go-ipfs/routing/mock"
21
	util "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23 24 25
)

func TestGetBlockTimeout(t *testing.T) {

26
	net := tn.VirtualNetwork()
27
	rs := mock.VirtualRoutingServer()
28
	g := NewSessionGenerator(net, rs)
29

30
	self := g.Next()
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
33
	block := blocks.NewBlock([]byte("block"))
Jeromy's avatar
Jeromy committed
34
	_, err := self.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36 37 38 39 40 41 42

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

43
	net := tn.VirtualNetwork()
44
	rs := mock.VirtualRoutingServer()
45
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46

47
	block := blocks.NewBlock([]byte("block"))
48
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49

50
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
Jeromy's avatar
Jeromy committed
53
	_, err := solo.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57 58 59
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork()
65
	rs := mock.VirtualRoutingServer()
66
	block := blocks.NewBlock([]byte("block"))
67
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
70

71
	if err := hasBlock.blockstore.Put(block); err != nil {
72 73
		t.Fatal(err)
	}
74
	if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
75 76
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
Jeromy's avatar
Jeromy committed
81
	received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84 85
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
86 87 88 89

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91
}

92
func TestSwarm(t *testing.T) {
93 94 95
	if testing.Short() {
		t.SkipNow()
	}
96
	net := tn.VirtualNetwork()
97
	rs := mock.VirtualRoutingServer()
98
	sg := NewSessionGenerator(net, rs)
99
	bg := NewBlockGenerator()
100 101 102

	t.Log("Create a ton of instances, and just a few blocks")

Jeromy's avatar
Jeromy committed
103
	numInstances := 5
104 105 106 107 108 109 110 111 112
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
113
		first.blockstore.Put(b)
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
Jeromy's avatar
Jeromy committed
145
		_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
146 147 148 149 150 151 152
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

153
// TODO simplify this test. get to the _essence_!
154
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
155 156 157 158
	if testing.Short() {
		t.SkipNow()
	}

159
	net := tn.VirtualNetwork()
160
	rs := mock.VirtualRoutingServer()
161
	sg := NewSessionGenerator(net, rs)
162
	bg := NewBlockGenerator()
163 164 165 166 167

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

168 169 170
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
171

172 173
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
174
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
175

176
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
177
	ctx, _ := context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
178
	_, err := w.exchange.GetBlock(ctx, alpha.Key())
179
	if err == nil {
180
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
181 182 183
	}

	beta := bg.Next()
184
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
185
	ctx, _ = context.WithTimeout(context.Background(), timeout)
186
	if err := w.blockstore.Put(&beta); err != nil {
187 188
		t.Fatal(err)
	}
189 190
	w.exchange.HasBlock(ctx, beta)

191
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
192
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
193
	if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
194 195
		t.Fatal(err)
	}
196

197
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
198
	ctx, _ = context.WithTimeout(context.Background(), timeout)
199
	if err := o.blockstore.Put(&alpha); err != nil {
200 201
		t.Fatal(err)
	}
202 203
	o.exchange.HasBlock(ctx, alpha)

204
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
205
	ctx, _ = context.WithTimeout(context.Background(), timeout)
Jeromy's avatar
Jeromy committed
206
	if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
207 208
		t.Fatal(err)
	}
209

210
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
211 212 213 214 215
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
216
		t.Fatal("Expected to receive alpha from me")
217
	}
218 219
}

220 221
func NewBlockGenerator() BlockGenerator {
	return BlockGenerator{}
222 223 224
}

type BlockGenerator struct {
225
	seq int
226 227 228 229
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
230
	return *blocks.NewBlock([]byte(string(bg.seq)))
231 232 233 234 235 236 237 238 239 240 241
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

242
func NewSessionGenerator(
243
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
244 245 246 247 248 249 250 251 252 253
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
254
	rs  mock.RoutingServer
255 256
}

257
func (g *SessionGenerator) Next() instance {
258 259 260 261
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

262 263 264 265 266 267 268 269 270 271
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
272
	peer       peer.Peer
Brian Tiger Chow's avatar
Brian Tiger Chow committed
273 274 275 276
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

277 278 279 280 281
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
282
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
283
	p := peer.WithID(id)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
284 285

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
286
	htc := rs.Client(p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
287

288
	blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
289
	const alwaysSendToPeer = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
290 291 292
	bs := &bitswap{
		blockstore:    blockstore,
		notifications: notifications.New(),
293
		strategy:      strategy.New(alwaysSendToPeer),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
294 295
		routing:       htc,
		sender:        adapter,
296
		wantlist:      util.NewKeySet(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
297
		blockRequests: make(chan util.Key, 32),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
298 299
	}
	adapter.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
300
	go bs.run(context.TODO())
301
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
302 303 304 305 306
		peer:       p,
		exchange:   bs,
		blockstore: blockstore,
	}
}