addcat_test.go 4.56 KB
Newer Older
1 2 3 4 5 6
package epictest

import (
	"bytes"
	"fmt"
	"io"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
7
	"math"
8 9 10 11
	"os"
	"testing"
	"time"

12
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
13
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
14 15 16 17 18 19
	blockservice "github.com/jbenet/go-ipfs/blockservice"
	bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
	importer "github.com/jbenet/go-ipfs/importer"
	chunk "github.com/jbenet/go-ipfs/importer/chunk"
	merkledag "github.com/jbenet/go-ipfs/merkledag"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20
	mocknet "github.com/jbenet/go-ipfs/net/mock"
21 22 23 24 25 26 27
	path "github.com/jbenet/go-ipfs/path"
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
	uio "github.com/jbenet/go-ipfs/unixfs/io"
	util "github.com/jbenet/go-ipfs/util"
	errors "github.com/jbenet/go-ipfs/util/debugerror"
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28 29
const kSeed = 1

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
func Test1KBInstantaneous(t *testing.T) {
31 32 33 34 35 36
	conf := Config{
		NetworkLatency:    0,
		RoutingLatency:    0,
		BlockstoreLatency: 0,
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39
	if err := AddCatBytes(RandomBytes(1*KB), conf); err != nil {
		t.Fatal(err)
	}
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
}

func TestDegenerateSlowBlockstore(t *testing.T) {
	SkipUnlessEpic(t)
	conf := Config{BlockstoreLatency: 50 * time.Millisecond}
	if err := AddCatPowers(conf, 128); err != nil {
		t.Fatal(err)
	}
}

func TestDegenerateSlowNetwork(t *testing.T) {
	SkipUnlessEpic(t)
	conf := Config{NetworkLatency: 400 * time.Millisecond}
	if err := AddCatPowers(conf, 128); err != nil {
		t.Fatal(err)
	}
}

func TestDegenerateSlowRouting(t *testing.T) {
	SkipUnlessEpic(t)
	conf := Config{RoutingLatency: 400 * time.Millisecond}
	if err := AddCatPowers(conf, 128); err != nil {
		t.Fatal(err)
	}
}

func Test100MBMacbookCoastToCoast(t *testing.T) {
	SkipUnlessEpic(t)
68 69
	conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
	if err := AddCatBytes(RandomBytes(100*1024*1024), conf); err != nil {
70 71 72 73 74 75 76 77
		t.Fatal(err)
	}
}

func AddCatPowers(conf Config, megabytesMax int64) error {
	var i int64
	for i = 1; i < megabytesMax; i = i * 2 {
		fmt.Printf("%d MB\n", i)
78
		if err := AddCatBytes(RandomBytes(i*1024*1024), conf); err != nil {
79 80 81 82 83 84
			return err
		}
	}
	return nil
}

85 86 87 88 89 90 91
func RandomBytes(n int64) []byte {
	var data bytes.Buffer
	random.WritePseudoRandomBytes(n, &data, kSeed)
	return data.Bytes()
}

func AddCatBytes(data []byte, conf Config) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92 93
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94 95 96 97
	mn := mocknet.New(ctx)
	// defer mn.Close() FIXME does mocknet require clean-up
	mn.SetLinkDefaults(mocknet.LinkOptions{
		Latency:   conf.NetworkLatency,
98 99
		// TODO add to conf. This is tricky because we want 0 values to be functional.
		Bandwidth: math.MaxInt32,
100
	})
Brian Tiger Chow's avatar
Brian Tiger Chow committed
101 102
	dhtNetwork := mockrouting.NewDHTNetwork(mn)
	net, err := tn.StreamNet(ctx, mn, dhtNetwork)
103 104 105 106
	if err != nil {
		return errors.Wrap(err)
	}
	sessionGenerator := bitswap.NewSessionGenerator(net)
107
	defer sessionGenerator.Close()
108 109 110

	adder := sessionGenerator.Next()
	catter := sessionGenerator.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
	// catter.Routing.Update(context.TODO(), adder.Peer)

	peers := mn.Peers()
	if len(peers) != 2 {
		return errors.New("peers not in network")
	}

	for _, i := range peers {
		for _, j := range peers {
			if i == j {
				continue
			}
			if _, err := mn.LinkPeers(i, j); err != nil {
				return err
			}
			if err := mn.ConnectPeers(i, j); err != nil {
				return err
			}
		}
	}

132 133 134
	catter.SetBlockstoreLatency(conf.BlockstoreLatency)

	adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation
135
	keyAdded, err := add(adder, bytes.NewReader(data))
136 137 138 139 140 141 142 143 144 145 146 147 148
	if err != nil {
		return err
	}
	adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait

	readerCatted, err := cat(catter, keyAdded)
	if err != nil {
		return err
	}

	// verify
	var bufout bytes.Buffer
	io.Copy(&bufout, readerCatted)
149
	if 0 != bytes.Compare(bufout.Bytes(), data) {
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
		return errors.New("catted data does not match added data")
	}
	return nil
}

func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) {
	catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange})
	nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
	if err != nil {
		return nil, err
	}
	return uio.NewDagReader(nodeCatted, catterdag)
}

func add(adder bitswap.Instance, r io.Reader) (util.Key, error) {
	nodeAdded, err := importer.BuildDagFromReader(
		r,
		merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}),
		nil,
		chunk.DefaultSplitter,
	)
	if err != nil {
		return "", err
	}
	return nodeAdded.Key()
}

func SkipUnlessEpic(t *testing.T) {
	if os.Getenv("IPFS_EPIC_TEST") == "" {
		t.SkipNow()
	}
}