Unverified Commit 42b17e90 authored by Will's avatar Will Committed by GitHub

Merge pull request #23 from libp2p/revert/ctrlbypass

Revert ctrlbypass
parents d913bbd8 d3225e95
...@@ -47,10 +47,6 @@ type Config struct { ...@@ -47,10 +47,6 @@ type Config struct {
// MaxMessageSize is the maximum size of a message that we'll send on a // MaxMessageSize is the maximum size of a message that we'll send on a
// stream. This ensures that a single stream doesn't hog a connection. // stream. This ensures that a single stream doesn't hog a connection.
MaxMessageSize uint32 MaxMessageSize uint32
// SendQueueSize is the maximum number of messages we'll keep in the local
// send queue before applying back pressure to writers.
SendQueueSize uint32
} }
// DefaultConfig is used to return a default configuration // DefaultConfig is used to return a default configuration
...@@ -65,7 +61,6 @@ func DefaultConfig() *Config { ...@@ -65,7 +61,6 @@ func DefaultConfig() *Config {
ReadBufSize: 4096, ReadBufSize: 4096,
MaxMessageSize: 64 * 1024, // Means 64KiB/10s = 52kbps minimum speed. MaxMessageSize: 64 * 1024, // Means 64KiB/10s = 52kbps minimum speed.
WriteCoalesceDelay: 100 * time.Microsecond, WriteCoalesceDelay: 100 * time.Microsecond,
SendQueueSize: 64,
} }
} }
......
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
pool "github.com/libp2p/go-buffer-pool" "github.com/libp2p/go-buffer-pool"
) )
// Session is used to wrap a reliable ordered connection and to // Session is used to wrap a reliable ordered connection and to
...@@ -67,9 +67,6 @@ type Session struct { ...@@ -67,9 +67,6 @@ type Session struct {
// sendCh is used to send messages // sendCh is used to send messages
sendCh chan []byte sendCh chan []byte
// sendCtrlCh is used to send control messages (skipping the normal send queue)
sendCtrlCh chan []byte
// recvDoneCh is closed when recv() exits to avoid a race // recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown // between stream registration and stream shutdown
recvDoneCh chan struct{} recvDoneCh chan struct{}
...@@ -112,8 +109,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio ...@@ -112,8 +109,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
inflight: make(map[uint32]struct{}), inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog), synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog), acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan []byte, config.SendQueueSize), sendCh: make(chan []byte, 64),
sendCtrlCh: make(chan []byte, 16),
recvDoneCh: make(chan struct{}), recvDoneCh: make(chan struct{}),
sendDoneCh: make(chan struct{}), sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
...@@ -274,7 +270,7 @@ func (s *Session) exitErr(err error) { ...@@ -274,7 +270,7 @@ func (s *Session) exitErr(err error) {
// GoAway can be used to prevent accepting further // GoAway can be used to prevent accepting further
// connections. It does not close the underlying conn. // connections. It does not close the underlying conn.
func (s *Session) GoAway() error { func (s *Session) GoAway() error {
return s.sendMsg(s.goAway(goAwayNormal), nil, nil, true) return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
} }
// goAway is used to send a goAway message // goAway is used to send a goAway message
...@@ -298,7 +294,7 @@ func (s *Session) Ping() (time.Duration, error) { ...@@ -298,7 +294,7 @@ func (s *Session) Ping() (time.Duration, error) {
// Send the ping request // Send the ping request
hdr := encode(typePing, flagSYN, 0, id) hdr := encode(typePing, flagSYN, 0, id)
if err := s.sendMsg(hdr, nil, nil, true); err != nil { if err := s.sendMsg(hdr, nil, nil); err != nil {
return 0, err return 0, err
} }
...@@ -378,7 +374,7 @@ func (s *Session) extendKeepalive() { ...@@ -378,7 +374,7 @@ func (s *Session) extendKeepalive() {
} }
// send sends the header and body. // send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, control bool) error { func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
select { select {
case <-s.shutdownCh: case <-s.shutdownCh:
return s.shutdownErr return s.shutdownErr
...@@ -390,17 +386,11 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, con ...@@ -390,17 +386,11 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, con
copy(buf[:headerSize], hdr[:]) copy(buf[:headerSize], hdr[:])
copy(buf[headerSize:], body) copy(buf[headerSize:], body)
var sendCh chan []byte
if control {
sendCh = s.sendCtrlCh
} else {
sendCh = s.sendCh
}
select { select {
case <-s.shutdownCh: case <-s.shutdownCh:
pool.Put(buf) pool.Put(buf)
return s.shutdownErr return s.shutdownErr
case sendCh <- buf: case s.sendCh <- buf:
return nil return nil
case <-deadline: case <-deadline:
pool.Put(buf) pool.Put(buf)
...@@ -456,65 +446,38 @@ func (s *Session) sendLoop() error { ...@@ -456,65 +446,38 @@ func (s *Session) sendLoop() error {
default: default:
} }
var buf []byte
// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}
// Flushes at least once every 100 microseconds unless we're // Flushes at least once every 100 microseconds unless we're
// constantly writing. // constantly writing.
var buf []byte
select { select {
case buf = <-s.sendCh: case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}
select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
}
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}
// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh: case <-s.shutdownCh:
return nil return nil
default: default:
select { select {
case buf = <-s.sendCh: case buf = <-s.sendCh:
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh: case <-s.shutdownCh:
return nil return nil
case <-writeTimeoutCh:
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}
select {
case buf = <-s.sendCh:
case <-s.shutdownCh:
return nil
}
if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}
} }
} }
if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}
SEND:
if err := extendWriteDeadline(); err != nil { if err := extendWriteDeadline(); err != nil {
pool.Put(buf) pool.Put(buf)
return err return err
...@@ -619,7 +582,7 @@ func (s *Session) handleStreamMessage(hdr header) error { ...@@ -619,7 +582,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
// Check if this is a window update // Check if this is a window update
if hdr.MsgType() == typeWindowUpdate { if hdr.MsgType() == typeWindowUpdate {
if err := stream.incrSendWindow(hdr, flags); err != nil { if err := stream.incrSendWindow(hdr, flags); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil { if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
} }
return err return err
...@@ -629,7 +592,7 @@ func (s *Session) handleStreamMessage(hdr header) error { ...@@ -629,7 +592,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
// Read the new data // Read the new data
if err := stream.readData(hdr, flags, s.reader); err != nil { if err := stream.readData(hdr, flags, s.reader); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil { if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
} }
return err return err
...@@ -647,7 +610,7 @@ func (s *Session) handlePing(hdr header) error { ...@@ -647,7 +610,7 @@ func (s *Session) handlePing(hdr header) error {
if flags&flagSYN == flagSYN { if flags&flagSYN == flagSYN {
go func() { go func() {
hdr := encode(typePing, flagACK, 0, pingID) hdr := encode(typePing, flagACK, 0, pingID)
if err := s.sendMsg(hdr, nil, nil, true); err != nil { if err := s.sendMsg(hdr, nil, nil); err != nil {
s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
} }
}() }()
...@@ -693,7 +656,7 @@ func (s *Session) incomingStream(id uint32) error { ...@@ -693,7 +656,7 @@ func (s *Session) incomingStream(id uint32) error {
// Reject immediately if we are doing a go away // Reject immediately if we are doing a go away
if atomic.LoadInt32(&s.localGoAway) == 1 { if atomic.LoadInt32(&s.localGoAway) == 1 {
hdr := encode(typeWindowUpdate, flagRST, id, 0) hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil, true) return s.sendMsg(hdr, nil, nil)
} }
// Allocate a new stream // Allocate a new stream
...@@ -705,7 +668,7 @@ func (s *Session) incomingStream(id uint32) error { ...@@ -705,7 +668,7 @@ func (s *Session) incomingStream(id uint32) error {
// Check if stream already exists // Check if stream already exists
if _, ok := s.streams[id]; ok { if _, ok := s.streams[id]; ok {
s.logger.Printf("[ERR] yamux: duplicate stream declared") s.logger.Printf("[ERR] yamux: duplicate stream declared")
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil { if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
} }
return ErrDuplicateStream return ErrDuplicateStream
...@@ -723,7 +686,7 @@ func (s *Session) incomingStream(id uint32) error { ...@@ -723,7 +686,7 @@ func (s *Session) incomingStream(id uint32) error {
s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
delete(s.streams, id) delete(s.streams, id)
hdr := encode(typeWindowUpdate, flagRST, id, 0) hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil, false) return s.sendMsg(hdr, nil, nil)
} }
} }
......
...@@ -1197,7 +1197,7 @@ func TestSession_sendMsg_Timeout(t *testing.T) { ...@@ -1197,7 +1197,7 @@ func TestSession_sendMsg_Timeout(t *testing.T) {
hdr := encode(typePing, flagACK, 0, 0) hdr := encode(typePing, flagACK, 0, 0)
for { for {
err := client.sendMsg(hdr, nil, nil, true) err := client.sendMsg(hdr, nil, nil)
if err == nil { if err == nil {
continue continue
} else if err == ErrConnectionWriteTimeout { } else if err == ErrConnectionWriteTimeout {
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
pool "github.com/libp2p/go-buffer-pool" "github.com/libp2p/go-buffer-pool"
) )
type streamState int type streamState int
...@@ -20,7 +20,6 @@ const ( ...@@ -20,7 +20,6 @@ const (
streamRemoteClose streamRemoteClose
streamClosed streamClosed
streamReset streamReset
streamWriteTimeout
) )
// Stream is used to represent a logical stream // Stream is used to represent a logical stream
...@@ -42,7 +41,6 @@ type Stream struct { ...@@ -42,7 +41,6 @@ type Stream struct {
recvNotifyCh chan struct{} recvNotifyCh chan struct{}
sendNotifyCh chan struct{} sendNotifyCh chan struct{}
sendCh chan []byte
readDeadline, writeDeadline pipeDeadline readDeadline, writeDeadline pipeDeadline
} }
...@@ -166,13 +164,12 @@ START: ...@@ -166,13 +164,12 @@ START:
// Determine the flags if any // Determine the flags if any
flags = s.sendFlags() flags = s.sendFlags()
// Send up to min(message, window) // Send up to min(message, window
max = min(window, s.session.config.MaxMessageSize-headerSize, uint32(len(b))) max = min(window, s.session.config.MaxMessageSize-headerSize, uint32(len(b)))
// Send the header // Send the header
hdr = encode(typeData, flags, s.id, max) hdr = encode(typeData, flags, s.id, max)
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait(), false); err != nil { if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait()); err != nil {
// Indicate queued message.
return 0, err return 0, err
} }
...@@ -231,7 +228,7 @@ func (s *Stream) sendWindowUpdate() error { ...@@ -231,7 +228,7 @@ func (s *Stream) sendWindowUpdate() error {
// Send the header // Send the header
hdr := encode(typeWindowUpdate, flags, s.id, delta) hdr := encode(typeWindowUpdate, flags, s.id, delta)
if err := s.session.sendMsg(hdr, nil, nil, true); err != nil { if err := s.session.sendMsg(hdr, nil, nil); err != nil {
return err return err
} }
return nil return nil
...@@ -242,13 +239,13 @@ func (s *Stream) sendClose() error { ...@@ -242,13 +239,13 @@ func (s *Stream) sendClose() error {
flags := s.sendFlags() flags := s.sendFlags()
flags |= flagFIN flags |= flagFIN
hdr := encode(typeWindowUpdate, flags, s.id, 0) hdr := encode(typeWindowUpdate, flags, s.id, 0)
return s.session.sendMsg(hdr, nil, nil, false) return s.session.sendMsg(hdr, nil, nil)
} }
// sendReset is used to send a RST // sendReset is used to send a RST
func (s *Stream) sendReset() error { func (s *Stream) sendReset() error {
hdr := encode(typeWindowUpdate, flagRST, s.id, 0) hdr := encode(typeWindowUpdate, flagRST, s.id, 0)
return s.session.sendMsg(hdr, nil, nil, false) return s.session.sendMsg(hdr, nil, nil)
} }
// Reset resets the stream (forcibly closes the stream) // Reset resets the stream (forcibly closes the stream)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment