bitswap_test.go 6.66 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
10

11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
13
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
14
	peer "github.com/jbenet/go-ipfs/peer"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
Jeromy's avatar
Jeromy committed
16
	u "github.com/jbenet/go-ipfs/util"
17
	delay "github.com/jbenet/go-ipfs/util/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18 19
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
22 23
const kNetworkDelay = 0 * time.Millisecond

24 25 26
func TestClose(t *testing.T) {
	// TODO
	t.Skip("TODO Bitswap's Close implementation is a WIP")
27
	vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
	rout := mockrouting.NewServer()
29
	sesgen := NewSessionGenerator(vnet, rout)
Jeromy's avatar
Jeromy committed
30
	defer sesgen.Close()
31
	bgen := blocksutil.NewBlockGenerator()
32 33 34 35

	block := bgen.Next()
	bitswap := sesgen.Next()

36 37
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
38 39
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
40 41
func TestGetBlockTimeout(t *testing.T) {

42
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43
	rs := mockrouting.NewServer()
44
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
45
	defer g.Close()
46

47
	self := g.Next()
48

Brian Tiger Chow's avatar
Brian Tiger Chow committed
49
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
50
	block := blocks.NewBlock([]byte("block"))
51
	_, err := self.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
52 53 54 55 56 57 58 59

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

60
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
61
	rs := mockrouting.NewServer()
62
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
63
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
64

65
	block := blocks.NewBlock([]byte("block"))
66 67
	pinfo := peer.PeerInfo{ID: peer.ID("testing")}
	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68

69
	solo := g.Next()
Jeromy's avatar
Jeromy committed
70
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
73
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
74

Brian Tiger Chow's avatar
Brian Tiger Chow committed
75 76 77 78 79
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
80 81 82 83
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

84
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
85
	rs := mockrouting.NewServer()
86
	block := blocks.NewBlock([]byte("block"))
87
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
88
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89

90
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
91
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92

93
	if err := hasBlock.Blockstore().Put(block); err != nil {
94 95
		t.Fatal(err)
	}
96
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
97 98
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99

100
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
101
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
104
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
105 106 107 108
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
109 110 111 112

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
113 114
}

115
func TestLargeSwarm(t *testing.T) {
116 117 118
	if testing.Short() {
		t.SkipNow()
	}
119
	t.Parallel()
Jeromy's avatar
Jeromy committed
120
	numInstances := 500
121
	numBlocks := 2
122 123
	PerformDistributionTest(t, numInstances, numBlocks)
}
124

125 126 127
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
128
	}
129 130 131 132
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
133 134
}

135
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
136 137 138
	if testing.Short() {
		t.SkipNow()
	}
139
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140
	rs := mockrouting.NewServer()
141
	sg := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
142
	defer sg.Close()
143
	bg := blocksutil.NewBlockGenerator()
144 145 146 147 148 149 150 151

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
152
	var blkeys []u.Key
153 154
	first := instances[0]
	for _, b := range blocks {
155
		first.Blockstore().Put(b)
Jeromy's avatar
Jeromy committed
156
		blkeys = append(blkeys, b.Key())
157
		first.Exchange.HasBlock(context.Background(), b)
158
		rs.Client(peer.PeerInfo{ID: first.Peer}).Provide(context.Background(), b.Key())
159 160 161 162
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
163
	wg := sync.WaitGroup{}
164
	for _, inst := range instances {
Jeromy's avatar
Jeromy committed
165 166 167 168 169 170 171 172 173 174
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
175 176 177 178 179 180 181
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
182
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
183 184 185 186 187 188
				t.Fatal(err)
			}
		}
	}
}

189
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
190
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
191
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
192 193 194 195 196 197 198
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

199
// TODO simplify this test. get to the _essence_!
200
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
201 202 203 204
	if testing.Short() {
		t.SkipNow()
	}

205
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
206
	rs := mockrouting.NewServer()
207
	sg := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
208
	defer sg.Close()
209
	bg := blocksutil.NewBlockGenerator()
210

Brian Tiger Chow's avatar
Brian Tiger Chow committed
211 212
	prev := rebroadcastDelay.Set(time.Second / 2)
	defer func() { rebroadcastDelay.Set(prev) }()
213

214 215
	peerA := sg.Next()
	peerB := sg.Next()
216

217 218
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
219

220 221
	timeout := time.Second
	waitTime := time.Second * 5
222

223 224 225 226 227
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
228 229
		t.Fatal(err)
	}
230

231 232 233 234
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
235 236
		t.Fatal(err)
	}
237

238 239 240 241
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
242
	}
243

244 245
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
246
	}
247

248
}
Jeromy's avatar
Jeromy committed
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278

func TestBasicBitswap(t *testing.T) {
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
	rs := mockrouting.NewServer()
	sg := NewSessionGenerator(net, rs)
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}