bitswap_test.go 7.34 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10 11
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
12
	"github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13 14 15 16
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
17
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18
	peer "github.com/jbenet/go-ipfs/peer"
19
	mock "github.com/jbenet/go-ipfs/routing/mock"
20
	util "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22 23 24 25
	testutil "github.com/jbenet/go-ipfs/util/testutil"
)

func TestGetBlockTimeout(t *testing.T) {

26
	net := tn.VirtualNetwork()
27
	rs := mock.VirtualRoutingServer()
28
	g := NewSessionGenerator(net, rs)
29

30
	self := g.Next()
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32 33
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	block := testutil.NewBlockOrFail(t, "block")
34
	_, err := self.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36 37 38 39 40 41 42

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

43
	net := tn.VirtualNetwork()
44
	rs := mock.VirtualRoutingServer()
45
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46

Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
	block := testutil.NewBlockOrFail(t, "block")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48 49
	rs.Announce(&peer.Peer{}, block.Key()) // but not on network

50
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52 53 54

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	_, err := solo.exchange.Block(ctx, block.Key())

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57 58 59
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

64
	net := tn.VirtualNetwork()
65
	rs := mock.VirtualRoutingServer()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66
	block := testutil.NewBlockOrFail(t, "block")
67
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
70

71 72 73 74 75 76
	if err := hasBlock.blockstore.Put(block); err != nil {
		t.Fatal(err)
	}
	if err := hasBlock.exchange.HasBlock(context.Background(), block); err != nil {
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77

78
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79 80

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
81
	received, err := wantsBlock.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84 85
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
86 87 88 89

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91
}

92 93
func TestSwarm(t *testing.T) {
	net := tn.VirtualNetwork()
94
	rs := mock.VirtualRoutingServer()
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator(t)

	t.Log("Create a ton of instances, and just a few blocks")

	numInstances := 500
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(*b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
		_, err := bitswap.exchange.Block(context.Background(), b.Key())
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

150
// TODO simplify this test. get to the _essence_!
151
func TestSendToWantingPeer(t *testing.T) {
152 153
	util.Debug = true

154
	net := tn.VirtualNetwork()
155
	rs := mock.VirtualRoutingServer()
156 157 158 159 160 161 162
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator(t)

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

163 164 165
	t.Logf("Session %v\n", me.peer)
	t.Logf("Session %v\n", w.peer)
	t.Logf("Session %v\n", o.peer)
166

167 168
	alpha := bg.Next()

169
	const timeout = 1 * time.Millisecond // FIXME don't depend on time
170

171
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
172 173 174
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	_, err := w.exchange.Block(ctx, alpha.Key())
	if err == nil {
175
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
176 177 178
	}

	beta := bg.Next()
179
	t.Logf("Peer %v announes availability  of %v\n", w.peer, beta.Key())
180
	ctx, _ = context.WithTimeout(context.Background(), timeout)
181 182 183
	if err := w.blockstore.Put(beta); err != nil {
		t.Fatal(err)
	}
184 185
	w.exchange.HasBlock(ctx, beta)

186
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
187
	ctx, _ = context.WithTimeout(context.Background(), timeout)
188 189 190
	if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
		t.Fatal(err)
	}
191

192
	t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
193
	ctx, _ = context.WithTimeout(context.Background(), timeout)
194 195 196
	if err := o.blockstore.Put(alpha); err != nil {
		t.Fatal(err)
	}
197 198
	o.exchange.HasBlock(ctx, alpha)

199
	t.Logf("%v requests %v\n", me.peer, alpha.Key())
200
	ctx, _ = context.WithTimeout(context.Background(), timeout)
201 202 203
	if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
		t.Fatal(err)
	}
204

205
	t.Logf("%v should now have %v\n", w.peer, alpha.Key())
206 207 208 209 210
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
211
		t.Fatal("Expected to receive alpha from me")
212
	}
213 214
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
func NewBlockGenerator(t *testing.T) BlockGenerator {
	return BlockGenerator{
		T: t,
	}
}

type BlockGenerator struct {
	*testing.T // b/c block generation can fail
	seq        int
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
	return testutil.NewBlockOrFail(bg.T, string(bg.seq))
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

240
func NewSessionGenerator(
241
	net tn.Network, rs mock.RoutingServer) SessionGenerator {
242 243 244 245 246 247 248 249 250 251
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
252
	rs  mock.RoutingServer
253 254
}

255
func (g *SessionGenerator) Next() instance {
256 257 258 259
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

260 261 262 263 264 265 266 267 268 269
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
270 271 272 273 274
	peer       *peer.Peer
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

275 276 277 278 279
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
280
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
281
	p := &peer.Peer{ID: id}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
282 283

	adapter := net.Adapter(p)
Jeromy's avatar
Jeromy committed
284
	htc := rs.Client(p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
285 286

	blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
287
	const alwaysSendToPeer = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
288 289 290
	bs := &bitswap{
		blockstore:    blockstore,
		notifications: notifications.New(),
291
		strategy:      strategy.New(alwaysSendToPeer),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
292 293
		routing:       htc,
		sender:        adapter,
294
		wantlist:      util.NewKeySet(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
295 296
	}
	adapter.SetDelegate(bs)
297
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
298 299 300 301 302
		peer:       p,
		exchange:   bs,
		blockstore: blockstore,
	}
}