Commit 43bcb2f1 authored by Marten Seemann's avatar Marten Seemann

make the initial stream receive window configurable

While the initial window size is defined by the yamux specification
(256 kB), we can just send a window update as soon as a stream is
opened / accepted. In fact, that's exactly what we do already.
parent 42482e31
...@@ -31,6 +31,10 @@ type Config struct { ...@@ -31,6 +31,10 @@ type Config struct {
// an expectation that things will move along quickly. // an expectation that things will move along quickly.
ConnectionWriteTimeout time.Duration ConnectionWriteTimeout time.Duration
// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
InitialStreamWindowSize uint32
// MaxStreamWindowSize is used to control the maximum // MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream. // window size that we allow for a stream.
MaxStreamWindowSize uint32 MaxStreamWindowSize uint32
...@@ -56,16 +60,17 @@ type Config struct { ...@@ -56,16 +60,17 @@ type Config struct {
// DefaultConfig is used to return a default configuration // DefaultConfig is used to return a default configuration
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
AcceptBacklog: 256, AcceptBacklog: 256,
PingBacklog: 32, PingBacklog: 32,
EnableKeepAlive: true, EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second, KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second, ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: maxStreamWindow, InitialStreamWindowSize: initialStreamWindow,
LogOutput: os.Stderr, MaxStreamWindowSize: maxStreamWindow,
ReadBufSize: 4096, LogOutput: os.Stderr,
MaxMessageSize: 64 * 1024, ReadBufSize: 4096,
WriteCoalesceDelay: 100 * time.Microsecond, MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
} }
} }
...@@ -77,8 +82,11 @@ func VerifyConfig(config *Config) error { ...@@ -77,8 +82,11 @@ func VerifyConfig(config *Config) error {
if config.KeepAliveInterval == 0 { if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive") return fmt.Errorf("keep-alive interval must be positive")
} }
if config.MaxStreamWindowSize < initialStreamWindow { if config.InitialStreamWindowSize < initialStreamWindow {
return errors.New("MaxStreamWindowSize must be larger than the initialStreamWindow (256 kB)") return errors.New("InitialStreamWindowSize must be larger or equal 256 kB")
}
if config.MaxStreamWindowSize < config.InitialStreamWindowSize {
return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize")
} }
if config.MaxMessageSize < 1024 { if config.MaxMessageSize < 1024 {
return fmt.Errorf("MaxMessageSize must be greater than a kilobyte") return fmt.Errorf("MaxMessageSize must be greater than a kilobyte")
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net" "net"
"reflect" "reflect"
"runtime" "runtime"
...@@ -1683,3 +1684,51 @@ func TestReadDeadlineInterrupt(t *testing.T) { ...@@ -1683,3 +1684,51 @@ func TestReadDeadlineInterrupt(t *testing.T) {
} }
} }
} }
// Make sure that a transfer doesn't stall, no matter what values the peers use for their InitialStreamWindow.
func TestInitialStreamWindow(t *testing.T) {
for i := 0; i < 10; i++ {
const (
maxWindow = 5 * initialStreamWindow
transferSize = 10 * maxWindow
)
rand.Seed(time.Now().UnixNano())
randomUint32 := func(min, max uint32) uint32 { return uint32(rand.Int63n(int64(max-min))) + min }
cconf := DefaultConfig()
cconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow)
sconf := DefaultConfig()
sconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow)
conn1, conn2 := testConn()
client, _ := Client(conn1, cconf)
server, _ := Server(conn2, sconf)
errChan := make(chan error, 1)
go func() {
defer close(errChan)
str, err := client.OpenStream(context.Background())
if err != nil {
errChan <- err
return
}
defer str.Close()
if _, err := str.Write(make([]byte, transferSize)); err != nil {
errChan <- err
return
}
}()
str, err := server.AcceptStream()
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(str)
if err != nil {
t.Fatal(err)
}
if uint32(len(data)) != transferSize {
t.Fatalf("expected %d bytes to be transferred, got %d", transferSize, len(data))
}
}
}
...@@ -59,11 +59,14 @@ func newStream(session *Session, id uint32, state streamState) *Stream { ...@@ -59,11 +59,14 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
sendWindow: initialStreamWindow, sendWindow: initialStreamWindow,
readDeadline: makePipeDeadline(), readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(), writeDeadline: makePipeDeadline(),
recvBuf: newSegmentedBuffer(initialStreamWindow), // Initialize the recvBuf with initialStreamWindow, not config.InitialStreamWindowSize.
recvWindow: initialStreamWindow, // The peer isn't allowed to send more data than initialStreamWindow until we've sent
epochStart: time.Now(), // the first window update (which will grant it up to config.InitialStreamWindowSize).
recvNotifyCh: make(chan struct{}, 1), recvBuf: newSegmentedBuffer(initialStreamWindow),
sendNotifyCh: make(chan struct{}, 1), recvWindow: session.config.InitialStreamWindowSize,
epochStart: time.Now(),
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
} }
return s return s
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment