Unverified Commit aaf9be96 authored by vyzo's avatar vyzo Committed by GitHub

Merge pull request #54 from libp2p/feat/coalesce-write

coalesce writes
parents 9c275bbc 03d48f46
...@@ -37,6 +37,8 @@ var ErrInvalidState = errors.New("received an unexpected message from the peer") ...@@ -37,6 +37,8 @@ var ErrInvalidState = errors.New("received an unexpected message from the peer")
var ( var (
NewStreamTimeout = time.Minute NewStreamTimeout = time.Minute
ResetStreamTimeout = 2 * time.Minute ResetStreamTimeout = 2 * time.Minute
WriteCoalesceDelay = 100 * time.Microsecond
) )
// +1 for initiator // +1 for initiator
...@@ -59,7 +61,9 @@ type Multiplex struct { ...@@ -59,7 +61,9 @@ type Multiplex struct {
shutdownErr error shutdownErr error
shutdownLock sync.Mutex shutdownLock sync.Mutex
wrTkn chan struct{} writeCh chan []byte
writeTimer *time.Timer
writeTimerFired bool
nstreams chan *Stream nstreams chan *Stream
...@@ -70,19 +74,19 @@ type Multiplex struct { ...@@ -70,19 +74,19 @@ type Multiplex struct {
// NewMultiplex creates a new multiplexer session. // NewMultiplex creates a new multiplexer session.
func NewMultiplex(con net.Conn, initiator bool) *Multiplex { func NewMultiplex(con net.Conn, initiator bool) *Multiplex {
mp := &Multiplex{ mp := &Multiplex{
con: con, con: con,
initiator: initiator, initiator: initiator,
buf: bufio.NewReader(con), buf: bufio.NewReader(con),
channels: make(map[streamID]*Stream), channels: make(map[streamID]*Stream),
closed: make(chan struct{}), closed: make(chan struct{}),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
wrTkn: make(chan struct{}, 1), writeCh: make(chan []byte, 16),
nstreams: make(chan *Stream, 16), writeTimer: time.NewTimer(0),
nstreams: make(chan *Stream, 16),
} }
go mp.handleIncoming() go mp.handleIncoming()
go mp.handleOutgoing()
mp.wrTkn <- struct{}{}
return mp return mp
} }
...@@ -146,7 +150,6 @@ func (mp *Multiplex) IsClosed() bool { ...@@ -146,7 +150,6 @@ func (mp *Multiplex) IsClosed() bool {
func (mp *Multiplex) sendMsg(ctx context.Context, header uint64, data []byte) error { func (mp *Multiplex) sendMsg(ctx context.Context, header uint64, data []byte) error {
buf := pool.Get(len(data) + 20) buf := pool.Get(len(data) + 20)
defer pool.Put(buf)
n := 0 n := 0
n += binary.PutUvarint(buf[n:], header) n += binary.PutUvarint(buf[n:], header)
...@@ -154,37 +157,102 @@ func (mp *Multiplex) sendMsg(ctx context.Context, header uint64, data []byte) er ...@@ -154,37 +157,102 @@ func (mp *Multiplex) sendMsg(ctx context.Context, header uint64, data []byte) er
n += copy(buf[n:], data) n += copy(buf[n:], data)
select { select {
case tkn := <-mp.wrTkn: case mp.writeCh <- buf[:n]:
defer func() { mp.wrTkn <- tkn }() return nil
case <-mp.shutdown:
return ErrShutdown
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
}
if mp.isShutdown() { func (mp *Multiplex) handleOutgoing() {
return ErrShutdown for {
} select {
case <-mp.shutdown:
return
dl, hasDl := ctx.Deadline() case data := <-mp.writeCh:
if hasDl { err := mp.writeMsg(data)
if err := mp.con.SetWriteDeadline(dl); err != nil { if err != nil {
return err // the connection is closed by this time
log.Warningf("error writing data: %s", err.Error())
return
}
} }
} }
}
written, err := mp.con.Write(buf[:n]) func (mp *Multiplex) writeMsg(data []byte) error {
if err != nil && (written > 0 || isFatalNetworkError(err)) { if len(data) >= 512 {
// Bail. We've written partial message or it's a fatal error and can't do anything return mp.doWriteMsg(data)
// about this.
mp.closeNoWait()
return err
} }
if hasDl { buf := pool.Get(4096)
// only return this error if we don't *already* have an error from the write. defer pool.Put(buf)
if err2 := mp.con.SetWriteDeadline(time.Time{}); err == nil && err2 != nil {
return err2 n := copy(buf, data)
pool.Put(data)
if !mp.writeTimerFired {
if !mp.writeTimer.Stop() {
<-mp.writeTimer.C
} }
} }
mp.writeTimer.Reset(WriteCoalesceDelay)
mp.writeTimerFired = false
for {
select {
case data = <-mp.writeCh:
wr := copy(buf[n:], data)
if wr < len(data) {
// we filled the buffer, send it
err := mp.doWriteMsg(buf)
if err != nil {
pool.Put(data)
return err
}
if len(data)-wr >= 512 {
// the remaining data is not a small write, send it
err := mp.doWriteMsg(data[wr:])
pool.Put(data)
return err
}
n = copy(buf, data[wr:])
// we've written some, reset the timer to coalesce the rest
if !mp.writeTimer.Stop() {
<-mp.writeTimer.C
}
mp.writeTimer.Reset(WriteCoalesceDelay)
} else {
n += wr
}
pool.Put(data)
case <-mp.writeTimer.C:
mp.writeTimerFired = true
return mp.doWriteMsg(buf[:n])
case <-mp.shutdown:
return ErrShutdown
}
}
}
func (mp *Multiplex) doWriteMsg(data []byte) error {
if mp.isShutdown() {
return ErrShutdown
}
_, err := mp.con.Write(data)
if err != nil {
mp.closeNoWait()
}
return err return err
} }
......
...@@ -40,7 +40,7 @@ func TestSlowReader(t *testing.T) { ...@@ -40,7 +40,7 @@ func TestSlowReader(t *testing.T) {
// 100 is large enough that the buffer of the underlying connection will // 100 is large enough that the buffer of the underlying connection will
// fill up. // fill up.
for i := 0; i < 100; i++ { for i := 0; i < 10000; i++ {
_, err = sa.Write(mes) _, err = sa.Write(mes)
if err != nil { if err != nil {
break break
...@@ -346,7 +346,7 @@ func TestReset(t *testing.T) { ...@@ -346,7 +346,7 @@ func TestReset(t *testing.T) {
t.Fatalf("successfully wrote to reset stream") t.Fatalf("successfully wrote to reset stream")
} }
time.Sleep(10 * time.Millisecond) time.Sleep(200 * time.Millisecond)
n, err = sb.Write([]byte("test")) n, err = sb.Write([]byte("test"))
if n != 0 { if n != 0 {
...@@ -492,6 +492,8 @@ func TestFuzzCloseStream(t *testing.T) { ...@@ -492,6 +492,8 @@ func TestFuzzCloseStream(t *testing.T) {
} }
} }
time.Sleep(10 * time.Millisecond)
nchannels := 0 nchannels := 0
mpa.chLock.Lock() mpa.chLock.Lock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment