Unverified Commit d88611c4 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

feat(benchmarks): add p2p stress test (#91)

parent bd2d62f5
......@@ -3,9 +3,9 @@ package graphsync_test
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"math/rand"
"os"
"runtime"
"strings"
......@@ -31,6 +31,7 @@ import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
)
......@@ -48,10 +49,82 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
tdm, err := newTempDirMaker(b)
require.NoError(b, err)
b.Run("test-20-10000", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000), tdm)
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
})
b.Run("test-p2p-stress-10-128MB", func(b *testing.B) {
p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm)
})
}
func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mn := mocknet.New(ctx)
mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000})
net := tn.StreamNet(ctx, mn)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
instances, err := ig.Instances(1 + b.N)
require.NoError(b, err)
var allCids []cid.Cid
for i := 0; i < numfiles; i++ {
thisCids := df(ctx, b, instances[:1])
allCids = append(allCids, thisCids...)
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
runtime.GC()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
fetcher := instances[i+1]
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
require.NoError(b, err)
start := time.Now()
disconnectOn := rand.Intn(numfiles)
for j := 0; j < numfiles; j++ {
resultChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector)
wg.Add(1)
go func(j int) {
defer wg.Done()
results := 0
for {
select {
case <-ctx.Done():
return
case <-resultChan:
results++
if results == 100 && j == disconnectOn {
mn.DisconnectPeers(instances[0].Peer, instances[i+1].Peer)
mn.UnlinkPeers(instances[0].Peer, instances[i+1].Peer)
time.Sleep(100 * time.Millisecond)
mn.LinkPeers(instances[0].Peer, instances[i+1].Peer)
}
case err, ok := <-errChan:
if !ok {
return
}
b.Fatalf("received error on request: %s", err.Error())
}
}
}(j)
}
wg.Wait()
result := runStats{
Time: time.Since(start),
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, result)
cancel()
fetcher.Close()
}
testinstance.Close(instances)
ig.Close()
}
func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, d delay.D, bstoreLatency time.Duration, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
......@@ -116,10 +189,10 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int,
type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid
const unixfsChunkSize uint64 = 1 << 10
const unixfsLinksPerLevel = 1024
const defaultUnixfsChunkSize uint64 = 1 << 10
const defaultUnixfsLinksPerLevel = 1024
func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64) cid.Cid {
func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) cid.Cid {
data := make([]byte, size)
_, err := rand.Read(data)
......@@ -151,11 +224,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block
return nd.Cid()
}
func allFilesUniformSize(size uint64) distFunc {
func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size)
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel)
cids = append(cids, c)
}
return cids
......
package testnet
import (
"context"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/libp2p/go-libp2p-core/peer"
tnet "github.com/libp2p/go-libp2p-testing/net"
mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
type peernet struct {
mockpeernet.Mocknet
}
// StreamNet is a testnet that uses libp2p's MockNet
func StreamNet(ctx context.Context, net mockpeernet.Mocknet) Network {
return &peernet{net}
}
func (pn *peernet) Adapter(p tnet.Identity) gsnet.GraphSyncNetwork {
client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address())
if err != nil {
panic(err.Error())
}
pn.Mocknet.LinkAll()
return gsnet.NewFromLibp2pHost(client)
}
func (pn *peernet) HasPeer(p peer.ID) bool {
for _, member := range pn.Mocknet.Peers() {
if p == member {
return true
}
}
return false
}
var _ Network = (*peernet)(nil)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment