Unverified Commit d913bbd8 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #22 from libp2p/feat/ctrlbypass

less head-of-line blocking for control messages
parents 97856b4f dd0e1865
......@@ -47,6 +47,10 @@ type Config struct {
// MaxMessageSize is the maximum size of a message that we'll send on a
// stream. This ensures that a single stream doesn't hog a connection.
MaxMessageSize uint32
// SendQueueSize is the maximum number of messages we'll keep in the local
// send queue before applying back pressure to writers.
SendQueueSize uint32
}
// DefaultConfig is used to return a default configuration
......@@ -61,6 +65,7 @@ func DefaultConfig() *Config {
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024, // Means 64KiB/10s = 52kbps minimum speed.
WriteCoalesceDelay: 100 * time.Microsecond,
SendQueueSize: 64,
}
}
......
......@@ -14,7 +14,7 @@ import (
"sync/atomic"
"time"
"github.com/libp2p/go-buffer-pool"
pool "github.com/libp2p/go-buffer-pool"
)
// Session is used to wrap a reliable ordered connection and to
......@@ -67,6 +67,9 @@ type Session struct {
// sendCh is used to send messages
sendCh chan []byte
// sendCtrlCh is used to send control messages (skipping the normal send queue)
sendCtrlCh chan []byte
// recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown
recvDoneCh chan struct{}
......@@ -109,7 +112,8 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan []byte, 64),
sendCh: make(chan []byte, config.SendQueueSize),
sendCtrlCh: make(chan []byte, 16),
recvDoneCh: make(chan struct{}),
sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
......@@ -270,7 +274,7 @@ func (s *Session) exitErr(err error) {
// GoAway can be used to prevent accepting further
// connections. It does not close the underlying conn.
func (s *Session) GoAway() error {
return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
return s.sendMsg(s.goAway(goAwayNormal), nil, nil, true)
}
// goAway is used to send a goAway message
......@@ -294,7 +298,7 @@ func (s *Session) Ping() (time.Duration, error) {
// Send the ping request
hdr := encode(typePing, flagSYN, 0, id)
if err := s.sendMsg(hdr, nil, nil); err != nil {
if err := s.sendMsg(hdr, nil, nil, true); err != nil {
return 0, err
}
......@@ -374,7 +378,7 @@ func (s *Session) extendKeepalive() {
}
// send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, control bool) error {
select {
case <-s.shutdownCh:
return s.shutdownErr
......@@ -386,11 +390,17 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) err
copy(buf[:headerSize], hdr[:])
copy(buf[headerSize:], body)
var sendCh chan []byte
if control {
sendCh = s.sendCtrlCh
} else {
sendCh = s.sendCh
}
select {
case <-s.shutdownCh:
pool.Put(buf)
return s.shutdownErr
case s.sendCh <- buf:
case sendCh <- buf:
return nil
case <-deadline:
pool.Put(buf)
......@@ -446,38 +456,65 @@ func (s *Session) sendLoop() error {
default:
}
var buf []byte
// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}
// Flushes at least once every 100 microseconds unless we're
// constantly writing.
var buf []byte
select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}
select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
}
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}
// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh:
return nil
default:
select {
case buf = <-s.sendCh:
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}
select {
case buf = <-s.sendCh:
case <-s.shutdownCh:
return nil
}
if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}
}
}
if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}
SEND:
if err := extendWriteDeadline(); err != nil {
pool.Put(buf)
return err
......@@ -582,7 +619,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
// Check if this is a window update
if hdr.MsgType() == typeWindowUpdate {
if err := stream.incrSendWindow(hdr, flags); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return err
......@@ -592,7 +629,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
// Read the new data
if err := stream.readData(hdr, flags, s.reader); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return err
......@@ -610,7 +647,7 @@ func (s *Session) handlePing(hdr header) error {
if flags&flagSYN == flagSYN {
go func() {
hdr := encode(typePing, flagACK, 0, pingID)
if err := s.sendMsg(hdr, nil, nil); err != nil {
if err := s.sendMsg(hdr, nil, nil, true); err != nil {
s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
}
}()
......@@ -656,7 +693,7 @@ func (s *Session) incomingStream(id uint32) error {
// Reject immediately if we are doing a go away
if atomic.LoadInt32(&s.localGoAway) == 1 {
hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil)
return s.sendMsg(hdr, nil, nil, true)
}
// Allocate a new stream
......@@ -668,7 +705,7 @@ func (s *Session) incomingStream(id uint32) error {
// Check if stream already exists
if _, ok := s.streams[id]; ok {
s.logger.Printf("[ERR] yamux: duplicate stream declared")
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return ErrDuplicateStream
......@@ -686,7 +723,7 @@ func (s *Session) incomingStream(id uint32) error {
s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
delete(s.streams, id)
hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil)
return s.sendMsg(hdr, nil, nil, false)
}
}
......
......@@ -12,7 +12,13 @@ import (
)
func TestSession_PingOfDeath(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
conf := testConfNoKeepAlive()
// This test is slow and can easily time out on writes on CI.
//
// In the future, we might want to prioritize ping-replies over even
// other control messages, but that seems like overkill for now.
conf.ConnectionWriteTimeout = 1 * time.Second
client, server := testClientServerConfig(conf)
defer client.Close()
defer server.Close()
......
......@@ -1197,7 +1197,7 @@ func TestSession_sendMsg_Timeout(t *testing.T) {
hdr := encode(typePing, flagACK, 0, 0)
for {
err := client.sendMsg(hdr, nil, nil)
err := client.sendMsg(hdr, nil, nil, true)
if err == nil {
continue
} else if err == ErrConnectionWriteTimeout {
......
......@@ -6,7 +6,7 @@ import (
"sync/atomic"
"time"
"github.com/libp2p/go-buffer-pool"
pool "github.com/libp2p/go-buffer-pool"
)
type streamState int
......@@ -20,6 +20,7 @@ const (
streamRemoteClose
streamClosed
streamReset
streamWriteTimeout
)
// Stream is used to represent a logical stream
......@@ -41,6 +42,7 @@ type Stream struct {
recvNotifyCh chan struct{}
sendNotifyCh chan struct{}
sendCh chan []byte
readDeadline, writeDeadline pipeDeadline
}
......@@ -164,12 +166,13 @@ START:
// Determine the flags if any
flags = s.sendFlags()
// Send up to min(message, window
// Send up to min(message, window)
max = min(window, s.session.config.MaxMessageSize-headerSize, uint32(len(b)))
// Send the header
hdr = encode(typeData, flags, s.id, max)
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait()); err != nil {
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait(), false); err != nil {
// Indicate queued message.
return 0, err
}
......@@ -228,7 +231,7 @@ func (s *Stream) sendWindowUpdate() error {
// Send the header
hdr := encode(typeWindowUpdate, flags, s.id, delta)
if err := s.session.sendMsg(hdr, nil, nil); err != nil {
if err := s.session.sendMsg(hdr, nil, nil, true); err != nil {
return err
}
return nil
......@@ -239,13 +242,13 @@ func (s *Stream) sendClose() error {
flags := s.sendFlags()
flags |= flagFIN
hdr := encode(typeWindowUpdate, flags, s.id, 0)
return s.session.sendMsg(hdr, nil, nil)
return s.session.sendMsg(hdr, nil, nil, false)
}
// sendReset is used to send a RST
func (s *Stream) sendReset() error {
hdr := encode(typeWindowUpdate, flagRST, s.id, 0)
return s.session.sendMsg(hdr, nil, nil)
return s.session.sendMsg(hdr, nil, nil, false)
}
// Reset resets the stream (forcibly closes the stream)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment