Unverified Commit 42f195e9 authored by Alex Cruikshank's avatar Alex Cruikshank Committed by GitHub

testground test for graphsync (#132)

* initial testplan with parameterizable composition

* add disk based store option
Co-authored-by: default avataracruikshank <acruikshank@example.com>
parent 4372a80f
module github.com/ipfs/go-graphsync/testplans/graphsync
go 1.14
require (
github.com/dustin/go-humanize v1.0.0
github.com/hannahhoward/all-selector v0.1.0
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.1.2
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-ipld-prime v0.4.0
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)
\ No newline at end of file
This diff is collapsed.
package main
import (
"context"
"crypto/rand"
"fmt"
"io"
"os"
"path/filepath"
goruntime "runtime"
"strings"
"time"
badgerds "github.com/ipfs/go-ds-badger"
"github.com/dustin/go-humanize"
allselector "github.com/hannahhoward/all-selector"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-graphsync/storeutil"
blockstore "github.com/ipfs/go-ipfs-blockstore"
chunk "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/testground/sdk-go/network"
"golang.org/x/sync/errgroup"
gs "github.com/ipfs/go-graphsync"
gsi "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
"github.com/testground/sdk-go/sync"
)
var testcases = map[string]interface{}{
"stress": run.InitializedTestCaseFn(runStress),
}
func main() {
run.InvokeMap(testcases)
}
type networkParams struct {
latency time.Duration
bandwidth uint64
}
func (p networkParams) String() string {
return fmt.Sprintf("<lat: %s, bandwidth: %d>", p.latency, p.bandwidth)
}
func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
var (
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")
networkParams = parseNetworkConfig(runenv)
)
runenv.RecordMessage("started test instance")
runenv.RecordMessage("network params: %v", networkParams)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
initCtx.MustWaitAllInstancesInitialized(ctx)
host, peers, _ := makeHost(ctx, runenv, initCtx)
defer host.Close()
var (
// make datastore, blockstore, dag service, graphsync
bs = blockstore.NewBlockstore(dss.MutexWrap(createDatastore(runenv.BooleanParam("disk_store"))))
dagsrv = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
gsync = gsi.New(ctx,
gsnet.NewFromLibp2pHost(host),
storeutil.LoaderForBlockstore(bs),
storeutil.StorerForBlockstore(bs),
)
)
defer initCtx.SyncClient.MustSignalAndWait(ctx, "done", runenv.TestInstanceCount)
switch runenv.TestGroupID {
case "providers":
if runenv.TestGroupInstanceCount > 1 {
panic("test case only supports one provider")
}
runenv.RecordMessage("we are the provider")
defer runenv.RecordMessage("done provider")
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
return runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency)
case "requestors":
runenv.RecordMessage("we are the requestor")
defer runenv.RecordMessage("done requestor")
p := *peers[0]
if err := host.Connect(ctx, p); err != nil {
return err
}
runenv.RecordMessage("done dialling provider")
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size)
default:
panic(fmt.Sprintf("unsupported group ID: %s\n", runenv.TestGroupID))
}
}
func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
var (
bandwidths = runenv.SizeArrayParam("bandwidths")
latencies []time.Duration
)
lats := runenv.StringArrayParam("latencies")
for _, l := range lats {
d, err := time.ParseDuration(l)
if err != nil {
panic(err)
}
latencies = append(latencies, d)
}
// prepend bandwidth=0 and latency=0 zero values; the first iteration will
// be a control iteration. The sidecar interprets zero values as no
// limitation on that attribute.
bandwidths = append([]uint64{0}, bandwidths...)
latencies = append([]time.Duration{0}, latencies...)
var ret []networkParams
for _, bandwidth := range bandwidths {
for _, latency := range latencies {
ret = append(ret, networkParams{
latency: latency,
bandwidth: bandwidth,
})
}
}
return ret
}
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
sel = allselector.AllSelector
)
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
// clean up previous CIDs to attempt to free memory
// TODO does this work?
_ = dagsrv.RemoveMany(ctx, cids)
runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)
sctx, scancel := context.WithCancel(ctx)
cidCh := make(chan []cid.Cid, 1)
initCtx.SyncClient.MustSubscribe(sctx, topicCid, cidCh)
cids = <-cidCh
scancel()
// run GC to get accurate-ish stats.
goruntime.GC()
goruntime.GC()
<-initCtx.SyncClient.MustBarrier(ctx, stateNet, 1).C
errgrp, grpctx := errgroup.WithContext(ctx)
for _, c := range cids {
c := c // capture
np := np // capture
errgrp.Go(func() error {
// make a go-ipld-prime link for the root UnixFS node
clink := cidlink.Link{Cid: c}
// execute the traversal.
runenv.RecordMessage("\t>>> requesting CID %s", c)
start := time.Now()
_, errCh := gsync.Request(grpctx, p.ID, clink, sel)
for err := range errCh {
return err
}
dur := time.Since(start)
runenv.RecordMessage("\t<<< request complete with no errors")
runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)
measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size))
measurement = strings.Replace(measurement, " ", "", -1)
runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second))
// verify that we have the CID now.
if node, err := dagsrv.Get(grpctx, c); err != nil {
return err
} else if node == nil {
return fmt.Errorf("finished graphsync request, but CID not in store")
}
return nil
})
}
if err := errgrp.Wait(); err != nil {
return err
}
}
return nil
}
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
// remove the previous CIDs from the dag service; hopefully this
// will delete them from the store and free up memory.
for _, c := range cids {
_ = dagsrv.Remove(ctx, c)
}
cids = cids[:0]
runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)
// generate as many random files as the concurrency level.
for i := 0; i < concurrency; i++ {
// file with random data
file := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size)))
unixfsChunkSize := uint64(1) << runenv.IntParam("chunk_size")
unixfsLinksPerLevel := runenv.IntParam("links_per_level")
params := ihelper.DagBuilderParams{
Maxlinks: unixfsLinksPerLevel,
RawLeaves: runenv.BooleanParam("raw_leaves"),
CidBuilder: nil,
Dagserv: bufferedDS,
}
db, err := params.New(chunk.NewSizeSplitter(file, int64(unixfsChunkSize)))
if err != nil {
return fmt.Errorf("unable to setup dag builder: %w", err)
}
node, err := balanced.Layout(db)
if err != nil {
return fmt.Errorf("unable to create unix fs node: %w", err)
}
cids = append(cids, node.Cid())
}
if err := bufferedDS.Commit(); err != nil {
return fmt.Errorf("unable to commit unix fs node: %w", err)
}
// run GC to get accurate-ish stats.
goruntime.GC()
goruntime.GC()
runenv.RecordMessage("\tCIDs are: %v", cids)
initCtx.SyncClient.MustPublish(ctx, topicCid, cids)
runenv.RecordMessage("\tconfiguring network for round %d", round)
initCtx.NetClient.MustConfigureNetwork(ctx, &network.Config{
Network: "default",
Enable: true,
Default: network.LinkShape{
Latency: np.latency,
Bandwidth: np.bandwidth * 8, // bps
},
CallbackState: stateNet,
CallbackTarget: 1,
})
runenv.RecordMessage("\tnetwork configured for round %d", round)
}
return nil
}
func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo, *metrics.BandwidthCounter) {
secureChannel := runenv.StringParam("secure_channel")
var security libp2p.Option
switch secureChannel {
case "noise":
security = libp2p.Security(noise.ID, noise.New)
case "secio":
security = libp2p.Security(secio.ID, secio.New)
case "tls":
security = libp2p.Security(tls.ID, tls.New)
}
// ☎️ Let's construct the libp2p node.
ip := initCtx.NetClient.MustGetDataNetworkIP()
listenAddr := fmt.Sprintf("/ip4/%s/tcp/0", ip)
bwcounter := metrics.NewBandwidthCounter()
host, err := libp2p.New(ctx,
security,
libp2p.ListenAddrStrings(listenAddr),
libp2p.BandwidthReporter(bwcounter),
)
if err != nil {
panic(fmt.Sprintf("failed to instantiate libp2p instance: %s", err))
}
// Record our listen addrs.
runenv.RecordMessage("my listen addrs: %v", host.Addrs())
// Obtain our own address info, and use the sync service to publish it to a
// 'peersTopic' topic, where others will read from.
var (
id = host.ID()
ai = &peer.AddrInfo{ID: id, Addrs: host.Addrs()}
// the peers topic where all instances will advertise their AddrInfo.
peersTopic = sync.NewTopic("peers", new(peer.AddrInfo))
// initialize a slice to store the AddrInfos of all other peers in the run.
peers = make([]*peer.AddrInfo, 0, runenv.TestInstanceCount-1)
)
// Publish our own.
initCtx.SyncClient.MustPublish(ctx, peersTopic, ai)
// Now subscribe to the peers topic and consume all addresses, storing them
// in the peers slice.
peersCh := make(chan *peer.AddrInfo)
sctx, scancel := context.WithCancel(ctx)
defer scancel()
sub := initCtx.SyncClient.MustSubscribe(sctx, peersTopic, peersCh)
// Receive the expected number of AddrInfos.
for len(peers) < cap(peers) {
select {
case ai := <-peersCh:
if ai.ID == id {
continue // skip over ourselves.
}
peers = append(peers, ai)
case err := <-sub.Done():
panic(err)
}
}
return host, peers, bwcounter
}
func createDatastore(diskStore bool) ds.Datastore {
if !diskStore {
return ds.NewMapDatastore()
}
// create temporary directory for badger datastore
path := filepath.Join(os.TempDir(), "datastore")
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0755); err != nil {
panic(err)
}
} else if err != nil {
panic(err)
}
// create disk based badger datastore
defopts := badgerds.DefaultOptions
defopts.SyncWrites = false
defopts.Truncate = true
datastore, err := badgerds.NewDatastore(path, &defopts)
if err != nil {
panic(err)
}
return datastore
}
name = "graphsync"
[builders]
"docker:go" = { enabled = true, enable_go_build_cache = true }
"exec:go" = { enabled = true }
[runners]
"local:docker" = { enabled = true }
"local:exec" = { enabled = true }
"cluster:k8s" = { enabled = true }
[global.build_config]
enable_go_build_cache = true
[[testcases]]
name = "stress"
instances = { min = 2, max = 10000, default = 2 }
[testcases.params]
size = { type = "int", desc = "size of file to transfer, in human-friendly form", default = "1MiB" }
secure_channel = { type = "enum", desc = "secure channel used", values = ["secio", "noise", "tls"], default = "noise" }
latencies = { type = "string", desc = "latencies to try with; comma-separated list of durations", default = '["100ms", "200ms", "300ms"]' }
bandwidths = { type = "string", desc = "bandwidths (egress bytes/s) to try with; comma-separated list of humanized sizes", default = '["10M", "1M", "512kb"]' }
concurrency = { type = "int", desc = "concurrency level", default = "1" }
chunk_size = { type = "int", desc = "unixfs chunk size (power of 2)", default = "20" }
links_per_level = { type = "int", desc = "unixfs links per level", default = "1024" }
raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"}
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
[metadata]
name = "stress"
[global]
plan = "graphsync"
case = "stress"
total_instances = 2
builder = "docker:go"
runner = "local:docker"
[global.run.test_params]
size = "10MB"
latencies = '["50ms", "100ms", "200ms"]'
bandwidths = '["32MiB", "16MiB", "8MiB", "4MiB", "1MiB"]'
concurrency = "10"
chunk_size = "20"
links_per_level = "1024"
raw_leaves = "true"
disk_store = "true"
[[groups]]
id = "providers"
instances = { count = 1 }
[[groups]]
id = "requestors"
instances = { count = 1 }
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment