bitswap_test.go 6.6 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
10

11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
13
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
14
	peer "github.com/jbenet/go-ipfs/peer"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
Jeromy's avatar
Jeromy committed
16
	u "github.com/jbenet/go-ipfs/util"
17
	delay "github.com/jbenet/go-ipfs/util/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18 19
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
22 23
const kNetworkDelay = 0 * time.Millisecond

24 25 26
func TestClose(t *testing.T) {
	// TODO
	t.Skip("TODO Bitswap's Close implementation is a WIP")
27 28
	vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sesgen := NewSessionGenerator(vnet)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
39 40
func TestGetBlockTimeout(t *testing.T) {

41 42
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	g := NewSessionGenerator(net)
Jeromy's avatar
Jeromy committed
43
	defer g.Close()
44

45
	self := g.Next()
46

Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
48
	block := blocks.NewBlock([]byte("block"))
49
	_, err := self.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50 51 52 53 54 55

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

56
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
Brian Tiger Chow's avatar
Brian Tiger Chow committed
57

Brian Tiger Chow's avatar
Brian Tiger Chow committed
58
	rs := mockrouting.NewServer()
59 60
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
	g := NewSessionGenerator(net)
Jeromy's avatar
Jeromy committed
61
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62

63
	block := blocks.NewBlock([]byte("block"))
64 65
	pinfo := peer.PeerInfo{ID: peer.ID("testing")}
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	solo := g.Next()
Jeromy's avatar
Jeromy committed
68
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69 70

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
71
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

Brian Tiger Chow's avatar
Brian Tiger Chow committed
73 74 75 76 77
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79 80 81
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

82
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
83
	block := blocks.NewBlock([]byte("block"))
84
	g := NewSessionGenerator(net)
Jeromy's avatar
Jeromy committed
85
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86

87
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
88
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89

90
	if err := hasBlock.Blockstore().Put(block); err != nil {
91 92
		t.Fatal(err)
	}
93
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
94 95
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96

97
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
98
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99 100

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
101
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103 104 105
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
106 107 108 109

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
110 111
}

112
func TestLargeSwarm(t *testing.T) {
113 114 115
	if testing.Short() {
		t.SkipNow()
	}
116
	t.Parallel()
Jeromy's avatar
Jeromy committed
117
	numInstances := 500
118
	numBlocks := 2
119 120
	PerformDistributionTest(t, numInstances, numBlocks)
}
121

122 123 124
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
125
	}
126 127 128 129
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
130 131
}

132
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
133 134 135
	if testing.Short() {
		t.SkipNow()
	}
136 137
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewSessionGenerator(net)
Jeromy's avatar
Jeromy committed
138
	defer sg.Close()
139
	bg := blocksutil.NewBlockGenerator()
140 141 142 143 144 145 146 147

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
148
	var blkeys []u.Key
149 150
	first := instances[0]
	for _, b := range blocks {
151
		first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
Jeromy's avatar
Jeromy committed
152
		blkeys = append(blkeys, b.Key())
153
		first.Exchange.HasBlock(context.Background(), b)
154 155 156 157
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
158
	wg := sync.WaitGroup{}
159
	for _, inst := range instances {
Jeromy's avatar
Jeromy committed
160 161 162 163 164 165 166 167 168 169
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
170 171 172 173 174 175 176
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
177
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
178 179 180 181 182 183
				t.Fatal(err)
			}
		}
	}
}

184
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
185
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
186
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
187 188 189 190 191 192 193
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

194
// TODO simplify this test. get to the _essence_!
195
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
196 197 198 199
	if testing.Short() {
		t.SkipNow()
	}

200 201
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewSessionGenerator(net)
Jeromy's avatar
Jeromy committed
202
	defer sg.Close()
203
	bg := blocksutil.NewBlockGenerator()
204

Brian Tiger Chow's avatar
Brian Tiger Chow committed
205 206
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
207

208 209
	peerA := sg.Next()
	peerB := sg.Next()
210

211 212
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
213

214 215
	timeout := time.Second
	waitTime := time.Second * 5
216

217 218 219 220 221
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
222 223
		t.Fatal(err)
	}
224

225 226 227 228
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
229 230
		t.Fatal(err)
	}
231

232 233 234 235
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
236
	}
237

238 239
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
240
	}
241

242
}
Jeromy's avatar
Jeromy committed
243 244

func TestBasicBitswap(t *testing.T) {
245 246
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	sg := NewSessionGenerator(net)
Jeromy's avatar
Jeromy committed
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}