Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-yamux
Commits
f6e177c8
Commit
f6e177c8
authored
Apr 14, 2021
by
Marten Seemann
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
increase the receive window size if we're sending updates to frequently
parent
9b04322f
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
134 additions
and
28 deletions
+134
-28
const.go
const.go
+2
-1
mux.go
mux.go
+18
-12
session.go
session.go
+16
-0
session_norace_test.go
session_norace_test.go
+1
-1
session_test.go
session_test.go
+74
-7
stream.go
stream.go
+23
-7
No files found.
const.go
View file @
f6e177c8
...
@@ -114,7 +114,8 @@ const (
...
@@ -114,7 +114,8 @@ const (
const
(
const
(
// initialStreamWindow is the initial stream window size
// initialStreamWindow is the initial stream window size
initialStreamWindow
uint32
=
256
*
1024
initialStreamWindow
uint32
=
64
*
1024
maxStreamWindow
uint32
=
16
*
1024
*
1024
)
)
const
(
const
(
...
...
mux.go
View file @
f6e177c8
package
yamux
package
yamux
import
(
import
(
"errors"
"fmt"
"fmt"
"io"
"io"
"net"
"net"
...
@@ -30,6 +31,10 @@ type Config struct {
...
@@ -30,6 +31,10 @@ type Config struct {
// an expectation that things will move along quickly.
// an expectation that things will move along quickly.
ConnectionWriteTimeout
time
.
Duration
ConnectionWriteTimeout
time
.
Duration
// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
InitialStreamWindowSize
uint32
// MaxStreamWindowSize is used to control the maximum
// MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream.
// window size that we allow for a stream.
MaxStreamWindowSize
uint32
MaxStreamWindowSize
uint32
...
@@ -55,16 +60,17 @@ type Config struct {
...
@@ -55,16 +60,17 @@ type Config struct {
// DefaultConfig is used to return a default configuration
// DefaultConfig is used to return a default configuration
func
DefaultConfig
()
*
Config
{
func
DefaultConfig
()
*
Config
{
return
&
Config
{
return
&
Config
{
AcceptBacklog
:
256
,
AcceptBacklog
:
256
,
PingBacklog
:
32
,
PingBacklog
:
32
,
EnableKeepAlive
:
true
,
EnableKeepAlive
:
true
,
KeepAliveInterval
:
30
*
time
.
Second
,
KeepAliveInterval
:
30
*
time
.
Second
,
ConnectionWriteTimeout
:
10
*
time
.
Second
,
ConnectionWriteTimeout
:
10
*
time
.
Second
,
MaxStreamWindowSize
:
initialStreamWindow
,
InitialStreamWindowSize
:
initialStreamWindow
,
LogOutput
:
os
.
Stderr
,
MaxStreamWindowSize
:
maxStreamWindow
,
ReadBufSize
:
4096
,
LogOutput
:
os
.
Stderr
,
MaxMessageSize
:
64
*
1024
,
ReadBufSize
:
4096
,
WriteCoalesceDelay
:
100
*
time
.
Microsecond
,
MaxMessageSize
:
64
*
1024
,
WriteCoalesceDelay
:
100
*
time
.
Microsecond
,
}
}
}
}
...
@@ -76,8 +82,8 @@ func VerifyConfig(config *Config) error {
...
@@ -76,8 +82,8 @@ func VerifyConfig(config *Config) error {
if
config
.
KeepAliveInterval
==
0
{
if
config
.
KeepAliveInterval
==
0
{
return
fmt
.
Errorf
(
"keep-alive interval must be positive"
)
return
fmt
.
Errorf
(
"keep-alive interval must be positive"
)
}
}
if
config
.
MaxStreamWindowSize
<
i
nitialStreamWindow
{
if
config
.
MaxStreamWindowSize
<
config
.
I
nitialStreamWindow
Size
{
return
fmt
.
Errorf
(
"MaxStreamWindowSize must be larger than
%d"
,
i
nitialStreamWindow
)
return
errors
.
New
(
"MaxStreamWindowSize must be larger than
the I
nitialStreamWindow
Size"
)
}
}
if
config
.
MaxMessageSize
<
1024
{
if
config
.
MaxMessageSize
<
1024
{
return
fmt
.
Errorf
(
"MaxMessageSize must be greater than a kilobyte"
)
return
fmt
.
Errorf
(
"MaxMessageSize must be greater than a kilobyte"
)
...
...
session.go
View file @
f6e177c8
...
@@ -21,6 +21,8 @@ import (
...
@@ -21,6 +21,8 @@ import (
// Session is used to wrap a reliable ordered connection and to
// Session is used to wrap a reliable ordered connection and to
// multiplex it into multiple streams.
// multiplex it into multiple streams.
type
Session
struct
{
type
Session
struct
{
rtt
int64
// to be accessed atomically, in nanoseconds
// remoteGoAway indicates the remote side does
// remoteGoAway indicates the remote side does
// not want futher connections. Must be first for alignment.
// not want futher connections. Must be first for alignment.
remoteGoAway
int32
remoteGoAway
int32
...
@@ -129,6 +131,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
...
@@ -129,6 +131,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
}
}
go
s
.
recv
()
go
s
.
recv
()
go
s
.
send
()
go
s
.
send
()
go
s
.
measureRTT
()
return
s
return
s
}
}
...
@@ -291,6 +294,19 @@ func (s *Session) goAway(reason uint32) header {
...
@@ -291,6 +294,19 @@ func (s *Session) goAway(reason uint32) header {
return
hdr
return
hdr
}
}
func
(
s
*
Session
)
measureRTT
()
{
rtt
,
err
:=
s
.
Ping
()
if
err
!=
nil
{
return
}
atomic
.
StoreInt64
(
&
s
.
rtt
,
rtt
.
Nanoseconds
())
}
// 0 if we don't yet have a measurement
func
(
s
*
Session
)
getRTT
()
time
.
Duration
{
return
time
.
Duration
(
atomic
.
LoadInt64
(
&
s
.
rtt
))
}
// Ping is used to measure the RTT response time
// Ping is used to measure the RTT response time
func
(
s
*
Session
)
Ping
()
(
dur
time
.
Duration
,
err
error
)
{
func
(
s
*
Session
)
Ping
()
(
dur
time
.
Duration
,
err
error
)
{
// Prepare a ping.
// Prepare a ping.
...
...
session_norace_test.go
View file @
f6e177c8
...
@@ -159,7 +159,7 @@ func TestLargeWindow(t *testing.T) {
...
@@ -159,7 +159,7 @@ func TestLargeWindow(t *testing.T) {
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
buf
:=
make
([]
byte
,
conf
.
Max
StreamWindow
Size
)
buf
:=
make
([]
byte
,
initial
StreamWindow
)
n
,
err
:=
stream
.
Write
(
buf
)
n
,
err
:=
stream
.
Write
(
buf
)
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatalf
(
"err: %v"
,
err
)
t
.
Fatalf
(
"err: %v"
,
err
)
...
...
session_test.go
View file @
f6e177c8
...
@@ -1165,7 +1165,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
...
@@ -1165,7 +1165,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
wg
.
Add
(
1
)
wg
.
Add
(
1
)
// Choose a huge flood size that we know will result in a window update.
// Choose a huge flood size that we know will result in a window update.
flood
:=
int64
(
client
.
config
.
Max
StreamWindow
Size
)
flood
:=
int64
(
initial
StreamWindow
)
var
wr
*
Stream
var
wr
*
Stream
// The server will accept a new stream and then flood data to it.
// The server will accept a new stream and then flood data to it.
...
@@ -1180,8 +1180,8 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
...
@@ -1180,8 +1180,8 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
}
}
sendWindow
:=
atomic
.
LoadUint32
(
&
wr
.
sendWindow
)
sendWindow
:=
atomic
.
LoadUint32
(
&
wr
.
sendWindow
)
if
sendWindow
!=
client
.
config
.
Max
StreamWindow
Size
{
if
sendWindow
!=
initial
StreamWindow
{
t
.
Errorf
(
"sendWindow: exp=%d, got=%d"
,
client
.
config
.
Max
StreamWindowSize
,
sendWindow
)
t
.
Errorf
(
"sendWindow: exp=%d, got=%d"
,
client
.
config
.
Initial
StreamWindowSize
,
sendWindow
)
return
return
}
}
...
@@ -1215,8 +1215,9 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
...
@@ -1215,8 +1215,9 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
}
}
var
(
var
(
exp
=
uint32
(
flood
/
2
)
expWithoutWindowIncrease
=
uint32
(
flood
/
2
)
sendWindow
uint32
expWithWindowIncrease
=
uint32
(
flood
)
sendWindow
uint32
)
)
// This test is racy. Wait a short period, then longer and longer. At
// This test is racy. Wait a short period, then longer and longer. At
...
@@ -1224,13 +1225,79 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
...
@@ -1224,13 +1225,79 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
for
i
:=
1
;
i
<
15
;
i
++
{
for
i
:=
1
;
i
<
15
;
i
++
{
time
.
Sleep
(
time
.
Duration
(
i
*
i
)
*
time
.
Millisecond
)
time
.
Sleep
(
time
.
Duration
(
i
*
i
)
*
time
.
Millisecond
)
sendWindow
=
atomic
.
LoadUint32
(
&
wr
.
sendWindow
)
sendWindow
=
atomic
.
LoadUint32
(
&
wr
.
sendWindow
)
if
sendWindow
==
exp
{
if
sendWindow
==
exp
WithoutWindowIncrease
||
sendWindow
==
expWithWindowIncrease
{
return
return
}
}
}
}
t
.
Errorf
(
"sendWindow: exp=%d, got=%d"
,
exp
,
sendWindow
)
t
.
Errorf
(
"sendWindow: exp=%d
or %d
, got=%d"
,
exp
WithoutWindowIncrease
,
expWithWindowIncrease
,
sendWindow
)
}
}
// func TestSession_WindowAutoSizing(t *testing.T) {
// const initialWindow uint32 = 10
// conf := testConfNoKeepAlive()
// conf.InitialStreamWindowSize = initialWindow
// client, server := testClientServerConfig(conf)
// defer client.Close()
// defer server.Close()
// receiveAndConsume := func(str *Stream, size uint32) {
// if _, err := str.Read(make([]byte, size)); err != nil {
// t.Fatal(err)
// }
// }
// clientStr, err := client.OpenStream(context.Background())
// if err != nil {
// t.Fatal(err)
// }
// serverStr, err := server.AcceptStream()
// if err != nil {
// t.Fatal(err)
// }
// const rtt = 20 * time.Millisecond
// t.Run("finding the window size", func(t *testing.T) {
// // Consume a maximum of 1234 bytes per RTT.
// // We expect the window to be scaled such that we send one update every 2 RTTs.
// go func() {
// for {
// serverStr.Write(make([]byte, 100))
// }
// }()
// var counter int
// ticker := time.NewTicker(rtt)
// for range ticker.C {
// receiveAndConsume(clientStr, 1234)
// counter++
// if counter > 25 {
// break
// }
// }
// fmt.Println(clientStr.recvWindow)
// })
// // t.Run("capping the window size", func(t *testing.T) {
// // const maxWindow = 78 * initialWindow
// // buf := newSegmentedBuffer(initialWindow, maxWindow, func() time.Duration { return rtt })
// // start := time.Now()
// // // Consume a maximum of 1234 bytes per RTT.
// // // We expect the window to be scaled such that we send one update every 2 RTTs.
// // now := start
// // delta := initialWindow
// // for i := 0; i < 100; i++ {
// // now = now.Add(rtt)
// // receiveAndConsume(&buf, delta)
// // grow, d := buf.GrowTo(false, now)
// // if grow {
// // delta = d
// // }
// // }
// // if buf.windowSize != maxWindow {
// // t.Fatalf("expected the window size to be at max (%d), got %d", maxWindow, buf.windowSize)
// // }
// // })
// }
func
TestSession_sendMsg_Timeout
(
t
*
testing
.
T
)
{
func
TestSession_sendMsg_Timeout
(
t
*
testing
.
T
)
{
client
,
server
:=
testClientServerConfig
(
testConfNoKeepAlive
())
client
,
server
:=
testClientServerConfig
(
testConfNoKeepAlive
())
defer
client
.
Close
()
defer
client
.
Close
()
...
...
stream.go
View file @
f6e177c8
...
@@ -2,6 +2,7 @@ package yamux
...
@@ -2,6 +2,7 @@ package yamux
import
(
import
(
"io"
"io"
"math"
"sync"
"sync"
"sync/atomic"
"sync/atomic"
"time"
"time"
...
@@ -33,6 +34,9 @@ type Stream struct {
...
@@ -33,6 +34,9 @@ type Stream struct {
id
uint32
id
uint32
session
*
Session
session
*
Session
recvWindow
uint32
epochStart
time
.
Time
state
streamState
state
streamState
writeState
,
readState
halfStreamState
writeState
,
readState
halfStreamState
stateLock
sync
.
Mutex
stateLock
sync
.
Mutex
...
@@ -48,6 +52,7 @@ type Stream struct {
...
@@ -48,6 +52,7 @@ type Stream struct {
// newStream is used to construct a new stream within
// newStream is used to construct a new stream within
// a given session for an ID
// a given session for an ID
func
newStream
(
session
*
Session
,
id
uint32
,
state
streamState
)
*
Stream
{
func
newStream
(
session
*
Session
,
id
uint32
,
state
streamState
)
*
Stream
{
initialStreamWindow
:=
session
.
config
.
InitialStreamWindowSize
s
:=
&
Stream
{
s
:=
&
Stream
{
id
:
id
,
id
:
id
,
session
:
session
,
session
:
session
,
...
@@ -56,6 +61,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
...
@@ -56,6 +61,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
readDeadline
:
makePipeDeadline
(),
readDeadline
:
makePipeDeadline
(),
writeDeadline
:
makePipeDeadline
(),
writeDeadline
:
makePipeDeadline
(),
recvBuf
:
newSegmentedBuffer
(
initialStreamWindow
),
recvBuf
:
newSegmentedBuffer
(
initialStreamWindow
),
recvWindow
:
initialStreamWindow
,
epochStart
:
time
.
Now
(),
recvNotifyCh
:
make
(
chan
struct
{},
1
),
recvNotifyCh
:
make
(
chan
struct
{},
1
),
sendNotifyCh
:
make
(
chan
struct
{},
1
),
sendNotifyCh
:
make
(
chan
struct
{},
1
),
}
}
...
@@ -202,16 +209,25 @@ func (s *Stream) sendWindowUpdate() error {
...
@@ -202,16 +209,25 @@ func (s *Stream) sendWindowUpdate() error {
// Determine the flags if any
// Determine the flags if any
flags
:=
s
.
sendFlags
()
flags
:=
s
.
sendFlags
()
// Determine the delta update
// Update the receive window.
max
:=
s
.
session
.
config
.
MaxStreamWindowSize
needed
,
delta
:=
s
.
recvBuf
.
GrowTo
(
s
.
recvWindow
,
flags
!=
0
)
// Update our window
needed
,
delta
:=
s
.
recvBuf
.
GrowTo
(
max
,
flags
!=
0
)
if
!
needed
{
if
!
needed
{
return
nil
return
nil
}
}
now
:=
time
.
Now
()
// Send the header
if
rtt
:=
s
.
session
.
getRTT
();
rtt
>
0
&&
now
.
Sub
(
s
.
epochStart
)
<
rtt
*
4
{
var
recvWindow
uint32
if
s
.
recvWindow
>
math
.
MaxUint32
/
2
{
recvWindow
=
min
(
math
.
MaxUint32
,
s
.
session
.
config
.
MaxStreamWindowSize
)
}
else
{
recvWindow
=
min
(
s
.
recvWindow
*
2
,
s
.
session
.
config
.
MaxStreamWindowSize
)
}
if
recvWindow
>
s
.
recvWindow
{
s
.
recvWindow
=
recvWindow
_
,
delta
=
s
.
recvBuf
.
GrowTo
(
s
.
recvWindow
,
true
)
}
}
s
.
epochStart
=
now
hdr
:=
encode
(
typeWindowUpdate
,
flags
,
s
.
id
,
delta
)
hdr
:=
encode
(
typeWindowUpdate
,
flags
,
s
.
id
,
delta
)
return
s
.
session
.
sendMsg
(
hdr
,
nil
,
nil
)
return
s
.
session
.
sendMsg
(
hdr
,
nil
,
nil
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment