bitswap_test.go 5.87 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9 10
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13
	peer "github.com/jbenet/go-ipfs/peer"
14
	mock "github.com/jbenet/go-ipfs/routing/mock"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15 16
)

17 18 19 20 21 22 23 24 25 26 27
func TestClose(t *testing.T) {
	// TODO
	t.Skip("TODO Bitswap's Close implementation is a WIP")
	vnet := tn.VirtualNetwork()
	rout := mock.VirtualRoutingServer()
	sesgen := NewSessionGenerator(vnet, rout)
	bgen := NewBlockGenerator()

	block := bgen.Next()
	bitswap := sesgen.Next()

28 29
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
30 31
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32 33
func TestGetBlockTimeout(t *testing.T) {

34
	net := tn.VirtualNetwork()
35
	rs := mock.VirtualRoutingServer()
36
	g := NewSessionGenerator(net, rs)
37

38
	self := g.Next()
39

Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
41
	block := blocks.NewBlock([]byte("block"))
42
	_, err := self.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43 44 45 46 47 48 49 50

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

51
	net := tn.VirtualNetwork()
52
	rs := mock.VirtualRoutingServer()
53
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54

55
	block := blocks.NewBlock([]byte("block"))
56
	rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
57

58
	solo := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
61
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66 67
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
68 69 70 71
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

72
	net := tn.VirtualNetwork()
73
	rs := mock.VirtualRoutingServer()
74
	block := blocks.NewBlock([]byte("block"))
75
	g := NewSessionGenerator(net, rs)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76

77
	hasBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78

79
	if err := hasBlock.Blockstore.Put(block); err != nil {
80 81
		t.Fatal(err)
	}
82
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
83 84
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
85

86
	wantsBlock := g.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87 88

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
89
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91 92 93
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
94 95 96 97

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98 99
}

100
func TestLargeSwarm(t *testing.T) {
101 102 103
	if testing.Short() {
		t.SkipNow()
	}
104
	t.Parallel()
105
	numInstances := 5
106
	numBlocks := 2
107 108
	PerformDistributionTest(t, numInstances, numBlocks)
}
109

110 111 112
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
113
	}
114 115 116 117
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
118 119
}

120
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
	if testing.Short() {
		t.SkipNow()
	}
	net := tn.VirtualNetwork()
	rs := mock.VirtualRoutingServer()
	sg := NewSessionGenerator(net, rs)
	bg := NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

	first := instances[0]
	for _, b := range blocks {
138 139 140
		first.Blockstore.Put(b)
		first.Exchange.HasBlock(context.Background(), b)
		rs.Announce(first.Peer, b.Key())
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
	}

	t.Log("Distribute!")

	var wg sync.WaitGroup

	for _, inst := range instances {
		for _, b := range blocks {
			wg.Add(1)
			// NB: executing getOrFail concurrently puts tremendous pressure on
			// the goroutine scheduler
			getOrFail(inst, b, t, &wg)
		}
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
161
			if _, err := inst.Blockstore.Get(b.Key()); err != nil {
162 163 164 165 166 167
				t.Fatal(err)
			}
		}
	}
}

168
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
169 170
	if _, err := bitswap.Blockstore.Get(b.Key()); err != nil {
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
171 172 173 174 175 176 177
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

178
// TODO simplify this test. get to the _essence_!
179
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
180 181 182 183
	if testing.Short() {
		t.SkipNow()
	}

184
	net := tn.VirtualNetwork()
185
	rs := mock.VirtualRoutingServer()
186
	sg := NewSessionGenerator(net, rs)
187
	bg := NewBlockGenerator()
188 189 190 191 192

	me := sg.Next()
	w := sg.Next()
	o := sg.Next()

193 194 195
	t.Logf("Session %v\n", me.Peer)
	t.Logf("Session %v\n", w.Peer)
	t.Logf("Session %v\n", o.Peer)
196

197 198
	alpha := bg.Next()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
199
	const timeout = 100 * time.Millisecond // FIXME don't depend on time
200

201
	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key())
202
	ctx, _ := context.WithTimeout(context.Background(), timeout)
203
	_, err := w.Exchange.GetBlock(ctx, alpha.Key())
204
	if err == nil {
205
		t.Fatalf("Expected %v to NOT be available", alpha.Key())
206 207 208
	}

	beta := bg.Next()
209
	t.Logf("Peer %v announes availability  of %v\n", w.Peer, beta.Key())
210
	ctx, _ = context.WithTimeout(context.Background(), timeout)
211
	if err := w.Blockstore.Put(beta); err != nil {
212 213
		t.Fatal(err)
	}
214
	w.Exchange.HasBlock(ctx, beta)
215

216
	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.Peer, beta.Key(), w.Peer, alpha.Key())
217
	ctx, _ = context.WithTimeout(context.Background(), timeout)
218
	if _, err := me.Exchange.GetBlock(ctx, beta.Key()); err != nil {
219 220
		t.Fatal(err)
	}
221

222
	t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key())
223
	ctx, _ = context.WithTimeout(context.Background(), timeout)
224
	if err := o.Blockstore.Put(alpha); err != nil {
225 226
		t.Fatal(err)
	}
227
	o.Exchange.HasBlock(ctx, alpha)
228

229
	t.Logf("%v requests %v\n", me.Peer, alpha.Key())
230
	ctx, _ = context.WithTimeout(context.Background(), timeout)
231
	if _, err := me.Exchange.GetBlock(ctx, alpha.Key()); err != nil {
232 233
		t.Fatal(err)
	}
234

235 236
	t.Logf("%v should now have %v\n", w.Peer, alpha.Key())
	block, err := w.Blockstore.Get(alpha.Key())
237 238 239 240
	if err != nil {
		t.Fatal("Should not have received an error")
	}
	if block.Key() != alpha.Key() {
241
		t.Fatal("Expected to receive alpha from me")
242
	}
243
}