bitswap_test.go 6.89 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10 11
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
12
	"github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13 14 15 16
	bstore "github.com/jbenet/go-ipfs/blockstore"
	exchange "github.com/jbenet/go-ipfs/exchange"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
17
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18 19 20 21 22 23
	peer "github.com/jbenet/go-ipfs/peer"
	testutil "github.com/jbenet/go-ipfs/util/testutil"
)

func TestGetBlockTimeout(t *testing.T) {

24 25 26
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	g := NewSessionGenerator(net, rs)
27

28
	self := g.Next()
29

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	block := testutil.NewBlockOrFail(t, "block")
32
	_, err := self.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35 36 37 38 39 40

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

41 42 43
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44

Brian Tiger Chow's avatar
Brian Tiger Chow committed
45
	block := testutil.NewBlockOrFail(t, "block")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47
	rs.Announce(&peer.Peer{}, block.Key()) // but not on network

48
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
	_, err := solo.exchange.Block(ctx, block.Key())

Brian Tiger Chow's avatar
Brian Tiger Chow committed
53 54 55 56 57
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

62 63
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
64
	block := testutil.NewBlockOrFail(t, "block")
65
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69 70 71 72 73 74
	if err := hasBlock.blockstore.Put(block); err != nil {
		t.Fatal(err)
	}
	if err := hasBlock.exchange.HasBlock(context.Background(), block); err != nil {
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75

76
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77 78

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
79
	received, err := wantsBlock.exchange.Block(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
80 81 82 83
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
84 85 86 87

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88 89
}

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
func TestSwarm(t *testing.T) {
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator(t)

	t.Log("Create a ton of instances, and just a few blocks")

	numInstances := 500
	numBlocks := 2

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
		first.blockstore.Put(*b)
		first.exchange.HasBlock(context.Background(), *b)
		rs.Announce(first.peer, b.Key())
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
			if _, err := inst.blockstore.Get(b.Key()); err != nil {
				t.Fatal(err)
			}
		}
	}
}

func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
	if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
		_, err := bitswap.exchange.Block(context.Background(), b.Key())
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

148
func TestSendToWantingPeer(t *testing.T) {
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
	net := tn.VirtualNetwork()
	rs := tn.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator(t)

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

	alpha := bg.Next()

	const timeout = 100 * time.Millisecond
	const wait = 100 * time.Millisecond

	t.Log("Peer |w| attempts to get a file |alpha|. NB: alpha not available")
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	_, err := w.exchange.Block(ctx, alpha.Key())
	if err == nil {
		t.Error("Expected alpha to NOT be available")
	}
	time.Sleep(wait)

	t.Log("Peer |w| announces availability of a file |beta|")
	beta := bg.Next()
	ctx, _ = context.WithTimeout(context.Background(), timeout)
	w.exchange.HasBlock(ctx, beta)
	time.Sleep(wait)

	t.Log("I request and get |beta| from |w|. In the message, I receive |w|'s wants [alpha]")
	t.Log("I don't have alpha, but I keep it on my wantlist.")
	ctx, _ = context.WithTimeout(context.Background(), timeout)
	me.exchange.Block(ctx, beta.Key())
	time.Sleep(wait)

	t.Log("Peer |o| announces the availability of |alpha|")
	ctx, _ = context.WithTimeout(context.Background(), timeout)
	o.exchange.HasBlock(ctx, alpha)
	time.Sleep(wait)

	t.Log("I request |alpha| for myself.")
	ctx, _ = context.WithTimeout(context.Background(), timeout)
	me.exchange.Block(ctx, alpha.Key())
	time.Sleep(wait)

193
	t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|")
194 195 196 197 198 199 200
	block, err := w.blockstore.Get(alpha.Key())
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
		t.Error("Expected to receive alpha from me")
	}
201 202
}

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
func NewBlockGenerator(t *testing.T) BlockGenerator {
	return BlockGenerator{
		T: t,
	}
}

type BlockGenerator struct {
	*testing.T // b/c block generation can fail
	seq        int
}

func (bg *BlockGenerator) Next() blocks.Block {
	bg.seq++
	return testutil.NewBlockOrFail(bg.T, string(bg.seq))
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
	blocks := make([]*blocks.Block, 0)
	for i := 0; i < n; i++ {
		b := bg.Next()
		blocks = append(blocks, &b)
	}
	return blocks
}

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
func NewSessionGenerator(
	net tn.Network, rs tn.RoutingServer) SessionGenerator {
	return SessionGenerator{
		net: net,
		rs:  rs,
		seq: 0,
	}
}

type SessionGenerator struct {
	seq int
	net tn.Network
	rs  tn.RoutingServer
}

243
func (g *SessionGenerator) Next() instance {
244 245 246 247
	g.seq++
	return session(g.net, g.rs, []byte(string(g.seq)))
}

248 249 250 251 252 253 254 255 256 257
func (g *SessionGenerator) Instances(n int) []instance {
	instances := make([]instance, 0)
	for j := 0; j < n; j++ {
		inst := g.Next()
		instances = append(instances, inst)
	}
	return instances
}

type instance struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
258 259 260 261 262
	peer       *peer.Peer
	exchange   exchange.Interface
	blockstore bstore.Blockstore
}

263 264 265 266 267
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
268
func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
269
	p := &peer.Peer{ID: id}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
270 271 272 273 274 275 276 277 278 279 280 281 282

	adapter := net.Adapter(p)
	htc := rs.Client(p)

	blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
	bs := &bitswap{
		blockstore:    blockstore,
		notifications: notifications.New(),
		strategy:      strategy.New(),
		routing:       htc,
		sender:        adapter,
	}
	adapter.SetDelegate(bs)
283
	return instance{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
284 285 286 287 288
		peer:       p,
		exchange:   bs,
		blockstore: blockstore,
	}
}