Unverified Commit edf24960 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #25 from ipfs/feat/dup-blocks-test-enhancement

feat(Benchmarks): Add real world dup blocks test
parents 663e7023 39fa3c73
...@@ -33,71 +33,102 @@ type runStats struct { ...@@ -33,71 +33,102 @@ type runStats struct {
var benchmarkLog []runStats var benchmarkLog []runStats
func BenchmarkDups2Nodes(b *testing.B) { func BenchmarkDups2Nodes(b *testing.B) {
fixedDelay := delay.Fixed(10 * time.Millisecond)
b.Run("AllToAll-OneAtATime", func(b *testing.B) { b.Run("AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime) subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
}) })
b.Run("AllToAll-BigBatch", func(b *testing.B) { b.Run("AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll) subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, batchFetchAll)
}) })
b.Run("Overlap1-OneAtATime", func(b *testing.B) { b.Run("Overlap1-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap1, oneAtATime)
}) })
b.Run("Overlap2-BatchBy10", func(b *testing.B) { b.Run("Overlap2-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap2, batchFetchBy10)
}) })
b.Run("Overlap3-OneAtATime", func(b *testing.B) { b.Run("Overlap3-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, oneAtATime)
}) })
b.Run("Overlap3-BatchBy10", func(b *testing.B) { b.Run("Overlap3-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchBy10)
}) })
b.Run("Overlap3-AllConcurrent", func(b *testing.B) { b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, fetchAllConcurrent)
}) })
b.Run("Overlap3-BigBatch", func(b *testing.B) { b.Run("Overlap3-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchAll)
}) })
b.Run("Overlap3-UnixfsFetch", func(b *testing.B) { b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch) subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, unixfsFileFetch)
}) })
b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) { b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime) subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, oneAtATime)
}) })
b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) { b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10) subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchBy10)
}) })
b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) { b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll) subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchAll)
}) })
b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) { b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent) subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, fetchAllConcurrent)
}) })
b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) { b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch) subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, unixfsFileFetch)
}) })
b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) { b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime) subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, oneAtATime)
}) })
b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) { b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll) subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, batchFetchAll)
}) })
b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) { b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch) subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, unixfsFileFetch)
}) })
b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) { b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll) subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
}) })
out, _ := json.MarshalIndent(benchmarkLog, "", " ") out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666) ioutil.WriteFile("benchmark.json", out, 0666)
} }
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) { const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
const distribution = 20 * time.Millisecond
func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.0, 0.0, distribution, nil)
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.3, 0.3, distribution, nil)
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
0.3, 0.3, distribution, nil)
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll)
})
}
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
start := time.Now() start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) net := tn.VirtualNetwork(mockrouting.NewServer(), d)
sg := NewTestSessionGenerator(net) sg := NewTestSessionGenerator(net)
defer sg.Close() defer sg.Close()
......
package bitswap
import (
"math/rand"
"time"
"github.com/ipfs/go-ipfs-delay"
)
var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano()))
// InternetLatencyDelayGenerator generates three clusters of delays,
// typical of the type of peers you would encounter on the interenet
// Given a base delay time T, the wait time generated will be either:
// 1. A normalized distribution around the base time
// 2. A normalized distribution around the base time plus a "medium" delay
// 3. A normalized distribution around the base time plus a "large" delay
// The size of the medium & large delays are determined when the generator
// is constructed, as well as the relative percentages with which delays fall
// into each of the three different clusters, and the standard deviation for
// the normalized distribution
// This can be used to generate a number of scenarios typical of latency
// distribution among peers on the internet
func InternetLatencyDelayGenerator(
mediumDelay time.Duration,
largeDelay time.Duration,
percentMedium float64,
percentLarge float64,
std time.Duration,
rng *rand.Rand) delay.Generator {
if rng == nil {
rng = sharedRNG
}
return &internetLatencyDelayGenerator{
mediumDelay: mediumDelay,
largeDelay: largeDelay,
percentLarge: percentLarge,
percentMedium: percentMedium,
std: std,
rng: rng,
}
}
type internetLatencyDelayGenerator struct {
mediumDelay time.Duration
largeDelay time.Duration
percentLarge float64
percentMedium float64
std time.Duration
rng *rand.Rand
}
func (d *internetLatencyDelayGenerator) NextWaitTime(t time.Duration) time.Duration {
clusterDistribution := d.rng.Float64()
baseDelay := time.Duration(d.rng.NormFloat64()*float64(d.std)) + t
if clusterDistribution < d.percentLarge {
return baseDelay + d.largeDelay
} else if clusterDistribution < d.percentMedium+d.percentLarge {
return baseDelay + d.mediumDelay
}
return baseDelay
}
package bitswap
import (
"math"
"math/rand"
"testing"
"time"
)
const testSeed = 99
func TestInternetLatencyDelayNextWaitTimeDistribution(t *testing.T) {
initialValue := 1000 * time.Millisecond
deviation := 100 * time.Millisecond
mediumDelay := 1000 * time.Millisecond
largeDelay := 3000 * time.Millisecond
percentMedium := 0.2
percentLarge := 0.4
buckets := make(map[string]int)
internetLatencyDistributionDelay := InternetLatencyDelayGenerator(
mediumDelay,
largeDelay,
percentMedium,
percentLarge,
deviation,
rand.New(rand.NewSource(testSeed)))
buckets["fast"] = 0
buckets["medium"] = 0
buckets["slow"] = 0
buckets["outside_1_deviation"] = 0
// strategy here is rather than mock randomness, just use enough samples to
// get approximately the distribution you'd expect
for i := 0; i < 10000; i++ {
next := internetLatencyDistributionDelay.NextWaitTime(initialValue)
if math.Abs((next - initialValue).Seconds()) <= deviation.Seconds() {
buckets["fast"]++
} else if math.Abs((next - initialValue - mediumDelay).Seconds()) <= deviation.Seconds() {
buckets["medium"]++
} else if math.Abs((next - initialValue - largeDelay).Seconds()) <= deviation.Seconds() {
buckets["slow"]++
} else {
buckets["outside_1_deviation"]++
}
}
totalInOneDeviation := float64(10000 - buckets["outside_1_deviation"])
oneDeviationPercentage := totalInOneDeviation / 10000
fastPercentageResult := float64(buckets["fast"]) / totalInOneDeviation
mediumPercentageResult := float64(buckets["medium"]) / totalInOneDeviation
slowPercentageResult := float64(buckets["slow"]) / totalInOneDeviation
// see 68-95-99 rule for normal distributions
if math.Abs(oneDeviationPercentage-0.6827) >= 0.1 {
t.Fatal("Failed to distribute values normally based on standard deviation")
}
if math.Abs(fastPercentageResult+percentMedium+percentLarge-1) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around fast delay time")
}
if math.Abs(mediumPercentageResult-percentMedium) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around medium delay time")
}
if math.Abs(slowPercentageResult-percentLarge) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around slow delay time")
}
}
...@@ -3,6 +3,7 @@ package bitswap ...@@ -3,6 +3,7 @@ package bitswap
import ( import (
"context" "context"
"errors" "errors"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -24,6 +25,7 @@ var log = logging.Logger("bstestnet") ...@@ -24,6 +25,7 @@ var log = logging.Logger("bstestnet")
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
return &network{ return &network{
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
clients: make(map[peer.ID]*receiverQueue), clients: make(map[peer.ID]*receiverQueue),
delay: d, delay: d,
routingserver: rs, routingserver: rs,
...@@ -33,6 +35,7 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { ...@@ -33,6 +35,7 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
type network struct { type network struct {
mu sync.Mutex mu sync.Mutex
latencies map[peer.ID]map[peer.ID]time.Duration
clients map[peer.ID]*receiverQueue clients map[peer.ID]*receiverQueue
routingserver mockrouting.Server routingserver mockrouting.Server
delay delay.D delay delay.D
...@@ -87,6 +90,18 @@ func (n *network) SendMessage( ...@@ -87,6 +90,18 @@ func (n *network) SendMessage(
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
latencies, ok := n.latencies[from]
if !ok {
latencies = make(map[peer.ID]time.Duration)
n.latencies[from] = latencies
}
latency, ok := latencies[to]
if !ok {
latency = n.delay.NextWaitTime()
latencies[to] = latency
}
receiver, ok := n.clients[to] receiver, ok := n.clients[to]
if !ok { if !ok {
return errors.New("cannot locate peer on network") return errors.New("cannot locate peer on network")
...@@ -98,7 +113,7 @@ func (n *network) SendMessage( ...@@ -98,7 +113,7 @@ func (n *network) SendMessage(
msg := &message{ msg := &message{
from: from, from: from,
msg: mes, msg: mes,
shouldSend: time.Now().Add(n.delay.Get()), shouldSend: time.Now().Add(latency),
} }
receiver.enqueue(msg) receiver.enqueue(msg)
...@@ -229,21 +244,38 @@ func (rq *receiverQueue) enqueue(m *message) { ...@@ -229,21 +244,38 @@ func (rq *receiverQueue) enqueue(m *message) {
} }
} }
func (rq *receiverQueue) Swap(i, j int) {
rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}
func (rq *receiverQueue) Len() int {
return len(rq.queue)
}
func (rq *receiverQueue) Less(i, j int) bool {
return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}
func (rq *receiverQueue) process() { func (rq *receiverQueue) process() {
for { for {
rq.lk.Lock() rq.lk.Lock()
sort.Sort(rq)
if len(rq.queue) == 0 { if len(rq.queue) == 0 {
rq.active = false rq.active = false
rq.lk.Unlock() rq.lk.Unlock()
return return
} }
m := rq.queue[0] m := rq.queue[0]
rq.queue = rq.queue[1:] if time.Until(m.shouldSend).Seconds() < 0.1 {
rq.lk.Unlock() rq.queue = rq.queue[1:]
rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend)) time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1) atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg) rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
} else {
rq.lk.Unlock()
time.Sleep(100 * time.Millisecond)
}
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment