Unverified Commit 18c401d8 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #42 from ipfs/feat/bandwidth-limited-tests

Feat/bandwidth limited tests
parents 916de59d 48f53bbc
...@@ -9,9 +9,10 @@ import ( ...@@ -9,9 +9,10 @@ import (
"testing" "testing"
"time" "time"
tn "github.com/ipfs/go-bitswap/testnet" "github.com/ipfs/go-bitswap/testutil"
bssession "github.com/ipfs/go-bitswap/session" bssession "github.com/ipfs/go-bitswap/session"
tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil" blocksutil "github.com/ipfs/go-ipfs-blocksutil"
...@@ -34,6 +35,7 @@ type runStats struct { ...@@ -34,6 +35,7 @@ type runStats struct {
var benchmarkLog []runStats var benchmarkLog []runStats
func BenchmarkDups2Nodes(b *testing.B) { func BenchmarkDups2Nodes(b *testing.B) {
benchmarkLog = nil
fixedDelay := delay.Fixed(10 * time.Millisecond) fixedDelay := delay.Fixed(10 * time.Millisecond)
b.Run("AllToAll-OneAtATime", func(b *testing.B) { b.Run("AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime) subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
...@@ -93,7 +95,7 @@ func BenchmarkDups2Nodes(b *testing.B) { ...@@ -93,7 +95,7 @@ func BenchmarkDups2Nodes(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll) subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
}) })
out, _ := json.MarshalIndent(benchmarkLog, "", " ") out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666) ioutil.WriteFile("tmp/benchmark.json", out, 0666)
} }
const fastSpeed = 60 * time.Millisecond const fastSpeed = 60 * time.Millisecond
...@@ -101,35 +103,49 @@ const mediumSpeed = 200 * time.Millisecond ...@@ -101,35 +103,49 @@ const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond const superSlowSpeed = 4000 * time.Millisecond
const distribution = 20 * time.Millisecond const distribution = 20 * time.Millisecond
const fastBandwidth = 1250000.0
const fastBandwidthDeviation = 300000.0
const mediumBandwidth = 500000.0
const mediumBandwidthDeviation = 80000.0
const slowBandwidth = 100000.0
const slowBandwidthDeviation = 16500.0
const stdBlockSize = 8000
func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) { func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
benchmarkLog = nil
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator( fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed, mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.0, 0.0, distribution, nil) 0.0, 0.0, distribution, nil)
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator) fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, nil)
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator( averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed, mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.3, 0.3, distribution, nil) 0.3, 0.3, distribution, nil)
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator) averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, nil)
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator( slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed, mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
0.3, 0.3, distribution, nil) 0.3, 0.3, distribution, nil)
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator) slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, nil)
b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) { b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll) subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
}) })
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) { b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll) subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
}) })
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) { b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll) subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
}) })
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
} }
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) { func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
start := time.Now() start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), d) net := tn.VirtualNetwork(mockrouting.NewServer(), d)
sg := NewTestSessionGenerator(net) sg := NewTestSessionGenerator(net)
defer sg.Close() defer sg.Close()
...@@ -137,6 +153,25 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, d ...@@ -137,6 +153,25 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, d
instances := sg.Instances(numnodes) instances := sg.Instances(numnodes)
blocks := bg.Blocks(numblks) blocks := bg.Blocks(numblks)
runDistribution(b, instances, blocks, df, ff, start)
}
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, df distFunc, ff fetchFunc) {
start := time.Now()
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
sg := NewTestSessionGenerator(net)
defer sg.Close()
instances := sg.Instances(numnodes)
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
runDistribution(b, instances, blocks, df, ff, start)
}
func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
numnodes := len(instances)
fetcher := instances[numnodes-1] fetcher := instances[numnodes-1]
......
...@@ -9,9 +9,9 @@ ...@@ -9,9 +9,9 @@
"gxDependencies": [ "gxDependencies": [
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmdJdFQc5U3RAKgJQGmWR7SSM7TLuER5FWz5Wq6Tzs2CnS", "hash": "QmYxivS34F2M2n44WQQnRHGAKS8aoRUxwGpi9wk4Cdn4Jf",
"name": "go-libp2p", "name": "go-libp2p",
"version": "6.0.29" "version": "6.0.30"
}, },
{ {
"author": "hsanjuan", "author": "hsanjuan",
...@@ -184,6 +184,12 @@ ...@@ -184,6 +184,12 @@
"hash": "Qmf7HqcW7LtCi1W8y2bdx2eJpze74jkbKqpByxgXikdbLF", "hash": "Qmf7HqcW7LtCi1W8y2bdx2eJpze74jkbKqpByxgXikdbLF",
"name": "go-detect-race", "name": "go-detect-race",
"version": "1.0.1" "version": "1.0.1"
},
{
"author": "jbenet",
"hash": "QmSJ9n2s9NUoA9D849W5jj5SJ94nMcZpj1jCgQJieiNqSt",
"name": "go-random",
"version": "1.0.0"
} }
], ],
"gxVersion": "0.12.1", "gxVersion": "0.12.1",
......
package bitswap
import (
"math/rand"
)
type fixedRateLimitGenerator struct {
rateLimit float64
}
// FixedRateLimitGenerator returns a rate limit generatoe that always generates
// the specified rate limit in bytes/sec.
func FixedRateLimitGenerator(rateLimit float64) RateLimitGenerator {
return &fixedRateLimitGenerator{rateLimit}
}
func (rateLimitGenerator *fixedRateLimitGenerator) NextRateLimit() float64 {
return rateLimitGenerator.rateLimit
}
type variableRateLimitGenerator struct {
rateLimit float64
std float64
rng *rand.Rand
}
// VariableRateLimitGenerator makes rate limites that following a normal distribution.
func VariableRateLimitGenerator(rateLimit float64, std float64, rng *rand.Rand) RateLimitGenerator {
if rng == nil {
rng = sharedRNG
}
return &variableRateLimitGenerator{
std: std,
rng: rng,
rateLimit: rateLimit,
}
}
func (rateLimitGenerator *variableRateLimitGenerator) NextRateLimit() float64 {
return rateLimitGenerator.rng.NormFloat64()*rateLimitGenerator.std + rateLimitGenerator.rateLimit
}
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
routing "github.com/libp2p/go-libp2p-routing" routing "github.com/libp2p/go-libp2p-routing"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
testutil "github.com/libp2p/go-testutil" testutil "github.com/libp2p/go-testutil"
) )
...@@ -29,6 +30,25 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { ...@@ -29,6 +30,25 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
clients: make(map[peer.ID]*receiverQueue), clients: make(map[peer.ID]*receiverQueue),
delay: d, delay: d,
routingserver: rs, routingserver: rs,
isRateLimited: false,
rateLimitGenerator: nil,
conns: make(map[string]struct{}),
}
}
type RateLimitGenerator interface {
NextRateLimit() float64
}
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
return &network{
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
rateLimiters: make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter),
clients: make(map[peer.ID]*receiverQueue),
delay: d,
routingserver: rs,
isRateLimited: true,
rateLimitGenerator: rateLimitGenerator,
conns: make(map[string]struct{}), conns: make(map[string]struct{}),
} }
} }
...@@ -36,9 +56,12 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { ...@@ -36,9 +56,12 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
type network struct { type network struct {
mu sync.Mutex mu sync.Mutex
latencies map[peer.ID]map[peer.ID]time.Duration latencies map[peer.ID]map[peer.ID]time.Duration
rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter
clients map[peer.ID]*receiverQueue clients map[peer.ID]*receiverQueue
routingserver mockrouting.Server routingserver mockrouting.Server
delay delay.D delay delay.D
isRateLimited bool
rateLimitGenerator RateLimitGenerator
conns map[string]struct{} conns map[string]struct{}
} }
...@@ -102,6 +125,26 @@ func (n *network) SendMessage( ...@@ -102,6 +125,26 @@ func (n *network) SendMessage(
latencies[to] = latency latencies[to] = latency
} }
var bandwidthDelay time.Duration
if n.isRateLimited {
rateLimiters, ok := n.rateLimiters[from]
if !ok {
rateLimiters = make(map[peer.ID]*mocknet.RateLimiter)
n.rateLimiters[from] = rateLimiters
}
rateLimiter, ok := rateLimiters[to]
if !ok {
rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit())
rateLimiters[to] = rateLimiter
}
size := mes.ToProtoV1().Size()
bandwidthDelay = rateLimiter.Limit(size)
} else {
bandwidthDelay = 0
}
receiver, ok := n.clients[to] receiver, ok := n.clients[to]
if !ok { if !ok {
return errors.New("cannot locate peer on network") return errors.New("cannot locate peer on network")
...@@ -113,7 +156,7 @@ func (n *network) SendMessage( ...@@ -113,7 +156,7 @@ func (n *network) SendMessage(
msg := &message{ msg := &message{
from: from, from: from,
msg: mes, msg: mes,
shouldSend: time.Now().Add(latency), shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
} }
receiver.enqueue(msg) receiver.enqueue(msg)
......
package testutil package testutil
import ( import (
"bytes"
random "github.com/jbenet/go-random"
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/wantlist" "github.com/ipfs/go-bitswap/wantlist"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
...@@ -11,6 +15,25 @@ import ( ...@@ -11,6 +15,25 @@ import (
var blockGenerator = blocksutil.NewBlockGenerator() var blockGenerator = blocksutil.NewBlockGenerator()
var prioritySeq int var prioritySeq int
var seedSeq int64
func randomBytes(n int64, seed int64) []byte {
data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seed)
return data.Bytes()
}
// GenerateBlocksOfSize generates a series of blocks of the given byte size
func GenerateBlocksOfSize(n int, size int64) []blocks.Block {
generatedBlocks := make([]blocks.Block, 0, n)
for i := 0; i < n; i++ {
seedSeq++
b := blocks.NewBlock(randomBytes(size, seedSeq))
generatedBlocks = append(generatedBlocks, b)
}
return generatedBlocks
}
// GenerateCids produces n content identifiers. // GenerateCids produces n content identifiers.
func GenerateCids(n int) []cid.Cid { func GenerateCids(n int) []cid.Cid {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment