bitswap_test.go 6.66 KB
Newer Older
1
package bitswap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7 8 9
	"testing"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
10
	blocks "github.com/jbenet/go-ipfs/blocks"
11
	blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
12
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
Jeromy's avatar
Jeromy committed
14
	u "github.com/jbenet/go-ipfs/util"
15
	delay "github.com/jbenet/go-ipfs/util/delay"
16
	testutil "github.com/jbenet/go-ipfs/util/testutil"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17 18
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
19 20
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
21 22
const kNetworkDelay = 0 * time.Millisecond

23 24 25
func TestClose(t *testing.T) {
	// TODO
	t.Skip("TODO Bitswap's Close implementation is a WIP")
26
	vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
27
	rout := mockrouting.NewServer()
28
	sesgen := NewSessionGenerator(vnet, rout)
Jeromy's avatar
Jeromy committed
29
	defer sesgen.Close()
30
	bgen := blocksutil.NewBlockGenerator()
31 32 33 34

	block := bgen.Next()
	bitswap := sesgen.Next()

35 36
	bitswap.Exchange.Close()
	bitswap.Exchange.GetBlock(context.Background(), block.Key())
37 38
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
39 40
func TestGetBlockTimeout(t *testing.T) {

41
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	rs := mockrouting.NewServer()
43
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
44
	defer g.Close()
45

46
	self := g.Next()
47

Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
49
	block := blocks.NewBlock([]byte("block"))
50
	_, err := self.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51 52 53 54 55 56 57 58

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

func TestProviderForKeyButNetworkCannotFind(t *testing.T) {

59
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
60
	rs := mockrouting.NewServer()
61
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
62
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63

64
	block := blocks.NewBlock([]byte("block"))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
	rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66

67
	solo := g.Next()
Jeromy's avatar
Jeromy committed
68
	defer solo.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69 70

	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
71
	_, err := solo.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72

Brian Tiger Chow's avatar
Brian Tiger Chow committed
73 74 75 76 77
	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79 80 81
// TestGetBlockAfterRequesting...

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

82
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83
	rs := mockrouting.NewServer()
84
	block := blocks.NewBlock([]byte("block"))
85
	g := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
86
	defer g.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87

88
	hasBlock := g.Next()
Jeromy's avatar
Jeromy committed
89
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90

91
	if err := hasBlock.Blockstore().Put(block); err != nil {
92 93
		t.Fatal(err)
	}
94
	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
95 96
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97

98
	wantsBlock := g.Next()
Jeromy's avatar
Jeromy committed
99
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
100 101

	ctx, _ := context.WithTimeout(context.Background(), time.Second)
102
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
103 104 105 106
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
107 108 109 110

	if !bytes.Equal(block.Data, received.Data) {
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112
}

113
func TestLargeSwarm(t *testing.T) {
114 115 116
	if testing.Short() {
		t.SkipNow()
	}
117
	t.Parallel()
Jeromy's avatar
Jeromy committed
118
	numInstances := 500
119
	numBlocks := 2
120 121
	PerformDistributionTest(t, numInstances, numBlocks)
}
122

123 124 125
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
126
	}
127 128 129 130
	t.Parallel()
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
131 132
}

133
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
134 135 136
	if testing.Short() {
		t.SkipNow()
	}
137
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138
	rs := mockrouting.NewServer()
139
	sg := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
140
	defer sg.Close()
141
	bg := blocksutil.NewBlockGenerator()
142 143 144 145 146 147 148 149

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(numInstances)
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

Jeromy's avatar
Jeromy committed
150
	var blkeys []u.Key
151 152
	first := instances[0]
	for _, b := range blocks {
153
		first.Blockstore().Put(b)
Jeromy's avatar
Jeromy committed
154
		blkeys = append(blkeys, b.Key())
155
		first.Exchange.HasBlock(context.Background(), b)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
156
		rs.Client(first.Peer).Provide(context.Background(), b.Key())
157 158 159 160
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
161
	wg := sync.WaitGroup{}
162
	for _, inst := range instances {
Jeromy's avatar
Jeromy committed
163 164 165 166 167 168 169 170 171 172
		wg.Add(1)
		go func(inst Instance) {
			defer wg.Done()
			outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
			if err != nil {
				t.Fatal(err)
			}
			for _ = range outch {
			}
		}(inst)
173 174 175 176 177 178 179
	}
	wg.Wait()

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
180
			if _, err := inst.Blockstore().Get(b.Key()); err != nil {
181 182 183 184 185 186
				t.Fatal(err)
			}
		}
	}
}

187
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
188
	if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
189
		_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
190 191 192 193 194 195 196
		if err != nil {
			t.Fatal(err)
		}
	}
	wg.Done()
}

197
// TODO simplify this test. get to the _essence_!
198
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
199 200 201 202
	if testing.Short() {
		t.SkipNow()
	}

203
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
204
	rs := mockrouting.NewServer()
205
	sg := NewSessionGenerator(net, rs)
Jeromy's avatar
Jeromy committed
206
	defer sg.Close()
207
	bg := blocksutil.NewBlockGenerator()
208

209 210 211
	oldVal := rebroadcastDelay
	rebroadcastDelay = time.Second / 2
	defer func() { rebroadcastDelay = oldVal }()
212

213 214
	peerA := sg.Next()
	peerB := sg.Next()
215

216 217
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
218

219 220
	timeout := time.Second
	waitTime := time.Second * 5
221

222 223 224 225 226
	alpha := bg.Next()
	// peerA requests and waits for block alpha
	ctx, _ := context.WithTimeout(context.TODO(), waitTime)
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
	if err != nil {
227 228
		t.Fatal(err)
	}
229

230 231 232 233
	// peerB announces to the network that he has block alpha
	ctx, _ = context.WithTimeout(context.TODO(), timeout)
	err = peerB.Exchange.HasBlock(ctx, alpha)
	if err != nil {
234 235
		t.Fatal(err)
	}
236

237 238 239 240
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
241
	}
242

243 244
	if blkrecvd.Key() != alpha.Key() {
		t.Fatal("Wrong block!")
245
	}
246

247
}
Jeromy's avatar
Jeromy committed
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277

func TestBasicBitswap(t *testing.T) {
	net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
	rs := mockrouting.NewServer()
	sg := NewSessionGenerator(net, rs)
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a few nodes trying to get one file with a lot of blocks")

	instances := sg.Instances(2)
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
	if err != nil {
		t.Fatal(err)
	}

	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}