track writes

parent 87755835
......@@ -9,14 +9,26 @@ import (
"testing"
"time"
"go.uber.org/atomic"
"google.golang.org/grpc/benchmark/latency"
)
// WriteTrackedConn provides a wrapper for tracking how many write calls are made to a network connection.
type WriteTrackedConn struct {
net.Conn
WriteCount atomic.Uint32
}
func (c *WriteTrackedConn) Write(b []byte) (n int, err error) {
c.WriteCount.Inc()
return c.Conn.Write(b)
}
// NetworkTestFunc is a benchmark function under test by `FindNetworkLimit`
type NetworkTestFunc func(b *testing.B, n1, n2 net.Conn)
// ConnectionForNetwork generates a pair of network connections with a specified latency.
func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) {
func ConnectionForNetwork(n *latency.Network) (n1, n2 *WriteTrackedConn, err error) {
var wg sync.WaitGroup
wg.Add(1)
......@@ -28,8 +40,9 @@ func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) {
slowListener := n.Listener(listener)
go func() {
defer wg.Done()
n2, _ = slowListener.Accept()
ac, _ := slowListener.Accept()
slowListener.Close()
n2 = &WriteTrackedConn{ac, atomic.Uint32{}}
return
}()
baseDialer := net.Dialer{}
......@@ -37,10 +50,11 @@ func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
n1, err = dialer(ctx, "tcp4", slowListener.Addr().String())
conn, err := dialer(ctx, "tcp4", slowListener.Addr().String())
if err != nil {
return
}
n1 = &WriteTrackedConn{conn, atomic.Uint32{}}
wg.Wait()
return
......@@ -55,11 +69,12 @@ func benchWithNet(testFunc NetworkTestFunc, n *latency.Network) func(*testing.B)
defer n1.Close()
defer n2.Close()
testFunc(sb, n1, n2)
sb.ReportMetric(float64(n1.WriteCount.Load()), "writes")
}
}
// FindNetworkLimit benchmarks a function to analyze CPU and network relationship
func FindNetworkLimit(testFunc NetworkTestFunc, fractionOfMax float64) (int, time.Duration, error) {
func FindNetworkLimit(testFunc NetworkTestFunc) (int, error) {
network := latency.Network{
Kbps: 0,
Latency: 0,
......@@ -69,31 +84,24 @@ func FindNetworkLimit(testFunc NetworkTestFunc, fractionOfMax float64) (int, tim
result := testing.Benchmark(wrapperFunc)
if result.N < 1 {
return 0, 0, fmt.Errorf("failed to run benchmark")
return 0, fmt.Errorf("failed to run benchmark")
}
max := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds()
fmt.Printf("CPU Bound Limit: %s\n", result)
current := max
network.Latency = 500 * time.Microsecond
for current > max*fractionOfMax {
fmt.Printf("Limit with no network latency: %s\n", result)
last := 1.0
network.Latency = 30 * time.Millisecond
network.Kbps = 1024 * 1 // 1 Mbps
result = testing.Benchmark(wrapperFunc)
current := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds()
for current/(2*last) > .1 {
network.Latency *= 2
result = testing.Benchmark(wrapperFunc)
last = current
current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds()
}
fmt.Printf("Latency Bound Limit: %s\n", network.Latency)
network.Kbps = 1024 * 100 // 100Mbps
network.Latency /= 2
for current > max*fractionOfMax {
network.Kbps /= 2
result = testing.Benchmark(wrapperFunc)
current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds()
}
fmt.Printf("Bandwidth Bound Limit: %dKbps\n", network.Kbps)
fmt.Printf("Network Bound Limit: %s\n", result)
fmt.Printf("At 30ms latency, bandwidth can saturate: %dKbps\n", network.Kbps)
return network.Kbps, network.Latency, nil
return network.Kbps, nil
}
// ParallelismSlowdown tracks how much overhead is incurred on a ntework bound function when parallelism contentention
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment