bitswap_test.go 7.49 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10 11
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
12
	"github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13 14 15 16
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
17
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18
	peer "github.com/jbenet/go-ipfs/peer"
19
	util "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22 23 24
	testutil "github.com/jbenet/go-ipfs/util/testutil"
)

func TestGetBlockTimeout(t *testing.T) {

25 26 27
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	g := NewSessionGenerator(net, rs)
28

29
	self := g.Next()
30

Brian Tiger Chow's avatar
Brian Tiger Chow committed
31 32
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	block := testutil.NewBlockOrFail(t, "block")
33
	_, err := self.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36 37 38 39 40 41

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

42 43 44
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
45

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
	block := testutil.NewBlockOrFail(t, "block")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47 48
	rs.Announce(&peer.Peer{}, block.Key()) // but not on network

49
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50 51 52 53

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	_, err := solo.exchange.Block(ctx, block.Key())

Brian Tiger Chow's avatar
Brian Tiger Chow committed
54 55 56 57 58
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60 61 62
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

63 64
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
	block := testutil.NewBlockOrFail(t, "block")
66
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
67

68
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69

70 71 72 73 74 75
	if err := hasBlock.blockstore.Put(block); err != nil {
		t.Fatal(err)
	}
	if err := hasBlock.exchange.HasBlock(context.Background(), block); err != nil {
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
80
	received, err := wantsBlock.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82 83 84
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
85 86 87 88

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90
}

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
func TestSwarm(t *testing.T) {
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator(t)

	t.Log("Create a ton of instances, and just a few blocks")

	numInstances := 500
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(*b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
		_, err := bitswap.exchange.Block(context.Background(), b.Key())
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

149
// TODO simplify this test. get to the _essence_!
150
func TestSendToWantingPeer(t *testing.T) {
151 152
	util.Debug = true

153 154 155 156 157 158 159 160 161
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator(t)

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

162 163 164 165
	t.Logf("Session %v\n", me.peer.Key().Pretty())
	t.Logf("Session %v\n", w.peer.Key().Pretty())
	t.Logf("Session %v\n", o.peer.Key().Pretty())

166 167
	alpha := bg.Next()

168
	const timeout = 1 * time.Millisecond // FIXME don't depend on time
169

170
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
171 172 173
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	_, err := w.exchange.Block(ctx, alpha.Key())
	if err == nil {
174
		t.Fatalf("Expected %v to NOT be available", alpha.Key().Pretty())
175 176 177
	}

	beta := bg.Next()
178
	t.Logf("Peer %v announes availability  of %v\n", w.peer.Key().Pretty(), beta.Key().Pretty())
179
	ctx, _ = context.WithTimeout(context.Background(), timeout)
180 181 182
	if err := w.blockstore.Put(beta); err != nil {
		t.Fatal(err)
	}
183 184
	w.exchange.HasBlock(ctx, beta)

185
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer.Key().Pretty(), beta.Key().Pretty(), w.peer.Key().Pretty(), alpha.Key().Pretty())
186
	ctx, _ = context.WithTimeout(context.Background(), timeout)
187 188 189
	if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
		t.Fatal(err)
	}
190

191
	t.Logf("%v announces availability of %v\n", o.peer.Key().Pretty(), alpha.Key().Pretty())
192
	ctx, _ = context.WithTimeout(context.Background(), timeout)
193 194 195
	if err := o.blockstore.Put(alpha); err != nil {
		t.Fatal(err)
	}
196 197
	o.exchange.HasBlock(ctx, alpha)

198
	t.Logf("%v requests %v\n", me.peer.Key().Pretty(), alpha.Key().Pretty())
199
	ctx, _ = context.WithTimeout(context.Background(), timeout)
200 201 202
	if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
		t.Fatal(err)
	}
203

204
	t.Logf("%v should now have %v\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
205 206 207 208 209
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
210
		t.Fatal("Expected to receive alpha from me")
211
	}
212 213
}

214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
func NewBlockGenerator(t *testing.T) BlockGenerator {
	return BlockGenerator{
		T: t,
	}
}

type BlockGenerator struct {
	*testing.T // b/c block generation can fail
	seq        int
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
	return testutil.NewBlockOrFail(bg.T, string(bg.seq))
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
func NewSessionGenerator(
	net tn.Network, rs tn.RoutingServer) SessionGenerator {
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
	rs  tn.RoutingServer
}

254
func (g *SessionGenerator) Next() instance {
255 256 257 258
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

259 260 261 262 263 264 265 266 267 268
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
269 270 271 272 273
	peer       *peer.Peer
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

274 275 276 277 278
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
279
func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
280
	p := &peer.Peer{ID: id}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
281 282 283 284 285

	adapter := net.Adapter(p)
	htc := rs.Client(p)

	blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
286
	const alwaysSendToPeer = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
287 288 289
	bs := &bitswap{
		blockstore:    blockstore,
		notifications: notifications.New(),
290
		strategy:      strategy.New(alwaysSendToPeer),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
291 292
		routing:       htc,
		sender:        adapter,
293
		wantlist:      util.NewKeySet(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
294 295
	}
	adapter.SetDelegate(bs)
296
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
297 298 299 300 301
		peer:       p,
		exchange:   bs,
		blockstore: blockstore,
	}
}