Unverified Commit fcb8c292 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Add optional HTTP comparison (#153)

* feat(testplan): add http comparison

* feat(testplan): add configuration & response timing

Add configuration for memory buffering and disabling unlimited options, also add timing of responses
Co-authored-by: default avataracruikshank <acruikshank@example.com>
parent 688dcf34
......@@ -27,6 +27,7 @@ require (
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
......
......@@ -3,13 +3,18 @@ package main
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
goruntime "runtime"
"runtime/pprof"
"strings"
gosync "sync"
"time"
dgbadger "github.com/dgraph-io/badger/v2"
......@@ -36,6 +41,7 @@ import (
noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls"
ma "github.com/multiformats/go-multiaddr"
"github.com/testground/sdk-go/network"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
......@@ -48,6 +54,60 @@ import (
"github.com/ipfs/go-graphsync/storeutil"
)
type AddrInfo struct {
peerAddr *peer.AddrInfo
ip net.IP
}
func (pi AddrInfo) MarshalJSON() ([]byte, error) {
out := make(map[string]interface{})
peerJSON, err := pi.peerAddr.MarshalJSON()
if err != nil {
panic(fmt.Sprintf("error marshaling: %v", err))
}
out["PEER"] = string(peerJSON)
ip, err := pi.ip.MarshalText()
if err != nil {
panic(fmt.Sprintf("error marshaling: %v", err))
}
out["IP"] = string(ip)
return json.Marshal(out)
}
func (pi *AddrInfo) UnmarshalJSON(b []byte) error {
var data map[string]interface{}
err := json.Unmarshal(b, &data)
if err != nil {
panic(fmt.Sprintf("error unmarshaling: %v", err))
}
var pa peer.AddrInfo
pi.peerAddr = &pa
peerAddrData := data["PEER"].(string)
var peerData map[string]interface{}
err = json.Unmarshal([]byte(peerAddrData), &peerData)
if err != nil {
panic(err)
}
pid, err := peer.Decode(peerData["ID"].(string))
if err != nil {
panic(err)
}
pi.peerAddr.ID = pid
addrs, ok := peerData["Addrs"].([]interface{})
if ok {
for _, a := range addrs {
pi.peerAddr.Addrs = append(pi.peerAddr.Addrs, ma.StringCast(a.(string)))
}
}
if err := pi.ip.UnmarshalText([]byte(data["IP"].(string))); err != nil {
panic(fmt.Sprintf("error unmarshaling: %v", err))
}
return nil
}
var testcases = map[string]interface{}{
"stress": run.InitializedTestCaseFn(runStress),
}
......@@ -80,7 +140,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
initCtx.MustWaitAllInstancesInitialized(ctx)
host, peers, _ := makeHost(ctx, runenv, initCtx)
host, ip, peers, _ := makeHost(ctx, runenv, initCtx)
defer host.Close()
datastore, err := createDatastore(runenv.BooleanParam("disk_store"))
......@@ -88,6 +148,9 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("datastore error: %s", err.Error())
return err
}
maxMemoryPerPeer := runenv.SizeParam("max_memory_per_peer")
maxMemoryTotal := runenv.SizeParam("max_memory_total")
var (
// make datastore, blockstore, dag service, graphsync
bs = blockstore.NewBlockstore(dss.MutexWrap(datastore))
......@@ -96,6 +159,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
gsnet.NewFromLibp2pHost(host),
storeutil.LoaderForBlockstore(bs),
storeutil.StorerForBlockstore(bs),
gsi.MaxMemoryPerPeerResponder(maxMemoryPerPeer),
gsi.MaxMemoryResponder(maxMemoryTotal),
)
recorder = &runRecorder{memorySnapshots: memorySnapshots, runenv: runenv}
)
......@@ -111,13 +176,36 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("we are the provider")
defer runenv.RecordMessage("done provider")
startTimes := make(map[struct {
peer.ID
gs.RequestID
}]time.Time)
var startTimesLk gosync.Mutex
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
startTimesLk.Lock()
startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}] = time.Now()
startTimesLk.Unlock()
})
gsync.RegisterCompletedResponseListener(func(p peer.ID, request gs.RequestData, status gs.ResponseStatusCode) {
startTimesLk.Lock()
startTime, ok := startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}]
startTimesLk.Unlock()
if ok && status == gs.RequestCompletedFull {
duration := time.Since(startTime)
recorder.recordRun(duration)
}
})
gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
recorder.recordBlock()
})
err := runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency, memorySnapshots, recorder)
err := runProvider(ctx, runenv, initCtx, dagsrv, size, ip, networkParams, concurrency, memorySnapshots, recorder)
if err != nil {
runenv.RecordMessage("Error running provider: %s", err.Error())
}
......@@ -129,8 +217,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
recorder.recordBlock()
})
p := *peers[0]
if err := host.Connect(ctx, p); err != nil {
p := peers[0]
if err := host.Connect(ctx, *p.peerAddr); err != nil {
return err
}
runenv.RecordMessage("done dialling provider")
......@@ -159,8 +247,12 @@ func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
// prepend bandwidth=0 and latency=0 zero values; the first iteration will
// be a control iteration. The sidecar interprets zero values as no
// limitation on that attribute.
bandwidths = append([]uint64{0}, bandwidths...)
latencies = append([]time.Duration{0}, latencies...)
if runenv.BooleanParam("unlimited_bandwidth_case") {
bandwidths = append([]uint64{0}, bandwidths...)
}
if runenv.BooleanParam("no_latency_case") {
latencies = append([]time.Duration{0}, latencies...)
}
var ret []networkParams
for _, bandwidth := range bandwidths {
......@@ -200,13 +292,15 @@ func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode {
}
}
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p *AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
sel = allselector.AllSelector
)
runHTTPTest := runenv.BooleanParam("compare_http")
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
......@@ -215,17 +309,14 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
)
recorder.beginRun(np, size, concurrency)
// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
recorder.beginRun(np, size, concurrency, round)
// clean up previous CIDs to attempt to free memory
// TODO does this work?
_ = dagsrv.RemoveMany(ctx, cids)
runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)
sctx, scancel := context.WithCancel(ctx)
cidCh := make(chan []cid.Cid, 1)
initCtx.SyncClient.MustSubscribe(sctx, topicCid, cidCh)
......@@ -240,8 +331,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
errgrp, grpctx := errgroup.WithContext(ctx)
for _, c := range cids {
c := c // capture
np := np // capture
c := c // capture
errgrp.Go(func() error {
// make a go-ipld-prime link for the root UnixFS node
......@@ -251,7 +341,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
runenv.RecordMessage("\t>>> requesting CID %s", c)
start := time.Now()
respCh, errCh := gsync.Request(grpctx, p.ID, clink, sel)
respCh, errCh := gsync.Request(grpctx, p.peerAddr.ID, clink, sel)
for range respCh {
}
for err := range errCh {
......@@ -259,20 +349,31 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
}
dur := time.Since(start)
runenv.RecordMessage("\t<<< request complete with no errors")
runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)
measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size))
measurement = strings.Replace(measurement, " ", "", -1)
runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second))
recorder.recordRun(dur)
// verify that we have the CID now.
if node, err := dagsrv.Get(grpctx, c); err != nil {
return err
} else if node == nil {
return fmt.Errorf("finished graphsync request, but CID not in store")
}
if runHTTPTest {
// request file directly over http
start = time.Now()
file, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s-", c.String()))
if err != nil {
panic(err)
}
resp, err := http.Get(fmt.Sprintf("http://%s:8080/%s", p.ip.String(), c.String()))
if err != nil {
panic(err)
}
bytesRead, err := io.Copy(file, resp.Body)
if err != nil {
panic(err)
}
dur = time.Since(start)
recorder.recordHTTPRun(dur, bytesRead)
}
return nil
})
}
......@@ -292,12 +393,26 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
return nil
}
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, ip net.IP, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)
runHTTPTest := runenv.BooleanParam("compare_http")
var svr *http.Server
if runHTTPTest {
// start an http server on port 8080
runenv.RecordMessage("creating http server at http://%s:8080", ip.String())
svr = &http.Server{Addr: ":8080"}
go func() {
if err := svr.ListenAndServe(); err != nil {
runenv.RecordMessage("shutdown http server at http://%s:8080", ip.String())
}
}()
}
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
......@@ -305,10 +420,10 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
recorder.beginRun(np, size, concurrency)
// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
recorder.beginRun(np, size, concurrency, round)
// remove the previous CIDs from the dag service; hopefully this
// will delete them from the store and free up memory.
......@@ -317,12 +432,17 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
}
cids = cids[:0]
runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)
// generate as many random files as the concurrency level.
for i := 0; i < concurrency; i++ {
// file with random data
file := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size)))
data := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size)))
file, err := ioutil.TempFile(os.TempDir(), "unixfs-")
if err != nil {
panic(err)
}
if _, err := io.Copy(file, data); err != nil {
panic(err)
}
unixfsChunkSize := uint64(1) << runenv.IntParam("chunk_size")
unixfsLinksPerLevel := runenv.IntParam("links_per_level")
......@@ -334,6 +454,9 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
Dagserv: bufferedDS,
}
if _, err := file.Seek(0, 0); err != nil {
panic(err)
}
db, err := params.New(chunk.NewSizeSplitter(file, int64(unixfsChunkSize)))
if err != nil {
return fmt.Errorf("unable to setup dag builder: %w", err)
......@@ -344,6 +467,20 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
return fmt.Errorf("unable to create unix fs node: %w", err)
}
if runHTTPTest {
// set up http server to send file
http.HandleFunc(fmt.Sprintf("/%s", node.Cid()), func(w http.ResponseWriter, r *http.Request) {
fileReader, err := os.Open(file.Name())
defer fileReader.Close()
if err != nil {
panic(err)
}
_, err = io.Copy(w, fileReader)
if err != nil {
panic(err)
}
})
}
cids = append(cids, node.Cid())
}
......@@ -382,10 +519,15 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
}
if runHTTPTest {
if err := svr.Shutdown(ctx); err != nil {
panic(err)
}
}
return nil
}
func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo, *metrics.BandwidthCounter) {
func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, net.IP, []*AddrInfo, *metrics.BandwidthCounter) {
secureChannel := runenv.StringParam("secure_channel")
var security libp2p.Option
......@@ -421,18 +563,21 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
ai = &peer.AddrInfo{ID: id, Addrs: host.Addrs()}
// the peers topic where all instances will advertise their AddrInfo.
peersTopic = sync.NewTopic("peers", new(peer.AddrInfo))
peersTopic = sync.NewTopic("peers", new(AddrInfo))
// initialize a slice to store the AddrInfos of all other peers in the run.
peers = make([]*peer.AddrInfo, 0, runenv.TestInstanceCount-1)
peers = make([]*AddrInfo, 0, runenv.TestInstanceCount-1)
)
// Publish our own.
initCtx.SyncClient.MustPublish(ctx, peersTopic, ai)
initCtx.SyncClient.MustPublish(ctx, peersTopic, &AddrInfo{
peerAddr: ai,
ip: ip,
})
// Now subscribe to the peers topic and consume all addresses, storing them
// in the peers slice.
peersCh := make(chan *peer.AddrInfo)
peersCh := make(chan *AddrInfo)
sctx, scancel := context.WithCancel(ctx)
defer scancel()
......@@ -442,7 +587,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
for len(peers) < cap(peers) {
select {
case ai := <-peersCh:
if ai.ID == id {
if ai.peerAddr.ID == id {
continue // skip over ourselves.
}
peers = append(peers, ai)
......@@ -451,7 +596,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
}
}
return host, peers, bwcounter
return host, ip, peers, bwcounter
}
func createDatastore(diskStore bool) (ds.Datastore, error) {
......@@ -521,7 +666,9 @@ type runRecorder struct {
np networkParams
size uint64
concurrency int
round int
runenv *runtime.RunEnv
measurement string
}
func (rr *runRecorder) recordBlock() {
......@@ -533,9 +680,27 @@ func (rr *runRecorder) recordBlock() {
rr.index++
}
func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int) {
func (rr *runRecorder) recordRun(duration time.Duration) {
rr.runenv.RecordMessage("\t<<< graphsync request complete with no errors")
rr.runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration)
rr.runenv.R().RecordPoint(rr.measurement+",transport=graphsync", float64(duration)/float64(time.Second))
}
func (rr *runRecorder) recordHTTPRun(duration time.Duration, bytesRead int64) {
rr.runenv.RecordMessage(fmt.Sprintf("\t<<< http request complete with no errors, read %d bytes", bytesRead))
rr.runenv.RecordMessage("***** ROUND %d observed http duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration)
rr.runenv.R().RecordPoint(rr.measurement+",transport=http", float64(duration)/float64(time.Second))
}
func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int, round int) {
rr.concurrency = concurrency
rr.np = np
rr.size = size
rr.index = 0
rr.round = round
rr.runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", rr.round, rr.np.latency, rr.np.bandwidth)
measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", rr.np.latency, humanize.IBytes(rr.np.bandwidth), rr.concurrency, humanize.Bytes(rr.size))
measurement = strings.Replace(measurement, " ", "", -1)
rr.measurement = measurement
}
......@@ -27,3 +27,8 @@ links_per_level = { type = "int", desc = "unixfs links per level", default = "10
raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"}
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" }
compare_http = { type = "bool", desc = "run a comparison against http", default = "true"}
max_memory_per_peer = { type = "int", desc = "max memory a responder can queue up per peer", default = "64MiB"}
max_memory_total = { type = "int", desc = "max memory a responder can queue up total", default = "512MiB"}
unlimited_bandwidth_case = { type = "bool", desc = "run a comparison against http", default = "true"}
no_latency_case = { type = "bool", desc = "run a comparison against http", default = "true"}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment