addcat_test.go 4.5 KB
Newer Older
1 2 3 4 5 6
package epictest

import (
	"bytes"
	"fmt"
	"io"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
7
	"math"
8 9 10 11
	"os"
	"testing"
	"time"

12
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
13
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
14 15 16 17 18 19
	blockservice "github.com/jbenet/go-ipfs/blockservice"
	bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
	importer "github.com/jbenet/go-ipfs/importer"
	chunk "github.com/jbenet/go-ipfs/importer/chunk"
	merkledag "github.com/jbenet/go-ipfs/merkledag"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20
	mocknet "github.com/jbenet/go-ipfs/net/mock"
21 22 23 24 25 26 27
	path "github.com/jbenet/go-ipfs/path"
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
	uio "github.com/jbenet/go-ipfs/unixfs/io"
	util "github.com/jbenet/go-ipfs/util"
	errors "github.com/jbenet/go-ipfs/util/debugerror"
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28 29
const kSeed = 1

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
func Test1KBInstantaneous(t *testing.T) {
31 32 33 34 35 36
	conf := Config{
		NetworkLatency:    0,
		RoutingLatency:    0,
		BlockstoreLatency: 0,
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39
	if err := AddCatBytes(RandomBytes(1*KB), conf); err != nil {
		t.Fatal(err)
	}
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
}

func TestDegenerateSlowBlockstore(t *testing.T) {
	SkipUnlessEpic(t)
	conf := Config{BlockstoreLatency: 50 * time.Millisecond}
	if err := AddCatPowers(conf, 128); err != nil {
		t.Fatal(err)
	}
}

func TestDegenerateSlowNetwork(t *testing.T) {
	SkipUnlessEpic(t)
	conf := Config{NetworkLatency: 400 * time.Millisecond}
	if err := AddCatPowers(conf, 128); err != nil {
		t.Fatal(err)
	}
}

func TestDegenerateSlowRouting(t *testing.T) {
	SkipUnlessEpic(t)
	conf := Config{RoutingLatency: 400 * time.Millisecond}
	if err := AddCatPowers(conf, 128); err != nil {
		t.Fatal(err)
	}
}

func Test100MBMacbookCoastToCoast(t *testing.T) {
	SkipUnlessEpic(t)
68 69
	conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
	if err := AddCatBytes(RandomBytes(100*1024*1024), conf); err != nil {
70 71 72 73 74 75 76 77
		t.Fatal(err)
	}
}

func AddCatPowers(conf Config, megabytesMax int64) error {
	var i int64
	for i = 1; i < megabytesMax; i = i * 2 {
		fmt.Printf("%d MB\n", i)
78
		if err := AddCatBytes(RandomBytes(i*1024*1024), conf); err != nil {
79 80 81 82 83 84
			return err
		}
	}
	return nil
}

85 86 87 88 89 90 91
func RandomBytes(n int64) []byte {
	var data bytes.Buffer
	random.WritePseudoRandomBytes(n, &data, kSeed)
	return data.Bytes()
}

func AddCatBytes(data []byte, conf Config) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92 93
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94 95 96 97 98
	mn := mocknet.New(ctx)
	// defer mn.Close() FIXME does mocknet require clean-up
	mn.SetLinkDefaults(mocknet.LinkOptions{
		Latency:   conf.NetworkLatency,
		Bandwidth: math.MaxInt32, // TODO add to conf
99
	})
Brian Tiger Chow's avatar
Brian Tiger Chow committed
100 101
	dhtNetwork := mockrouting.NewDHTNetwork(mn)
	net, err := tn.StreamNet(ctx, mn, dhtNetwork)
102 103 104 105
	if err != nil {
		return errors.Wrap(err)
	}
	sessionGenerator := bitswap.NewSessionGenerator(net)
106
	defer sessionGenerator.Close()
107 108 109

	adder := sessionGenerator.Next()
	catter := sessionGenerator.Next()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
	// catter.Routing.Update(context.TODO(), adder.Peer)

	peers := mn.Peers()
	if len(peers) != 2 {
		return errors.New("peers not in network")
	}

	for _, i := range peers {
		for _, j := range peers {
			if i == j {
				continue
			}
			if _, err := mn.LinkPeers(i, j); err != nil {
				return err
			}
			if err := mn.ConnectPeers(i, j); err != nil {
				return err
			}
		}
	}

131 132 133
	catter.SetBlockstoreLatency(conf.BlockstoreLatency)

	adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation
134
	keyAdded, err := add(adder, bytes.NewReader(data))
135 136 137 138 139 140 141 142 143 144 145 146 147
	if err != nil {
		return err
	}
	adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait

	readerCatted, err := cat(catter, keyAdded)
	if err != nil {
		return err
	}

	// verify
	var bufout bytes.Buffer
	io.Copy(&bufout, readerCatted)
148
	if 0 != bytes.Compare(bufout.Bytes(), data) {
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
		return errors.New("catted data does not match added data")
	}
	return nil
}

func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) {
	catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange})
	nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
	if err != nil {
		return nil, err
	}
	return uio.NewDagReader(nodeCatted, catterdag)
}

func add(adder bitswap.Instance, r io.Reader) (util.Key, error) {
	nodeAdded, err := importer.BuildDagFromReader(
		r,
		merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}),
		nil,
		chunk.DefaultSplitter,
	)
	if err != nil {
		return "", err
	}
	return nodeAdded.Key()
}

func SkipUnlessEpic(t *testing.T) {
	if os.Getenv("IPFS_EPIC_TEST") == "" {
		t.SkipNow()
	}
}