Commit 5c564a8b authored by Marten Seemann's avatar Marten Seemann

merge segmentedBuffer.TryReserve and Append

parent 96b99cb4
......@@ -422,15 +422,9 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
return nil
}
// Validate it's okay to copy
if !s.recvBuf.TryReserve(length) {
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvBuf.Cap(), length)
return ErrRecvWindowExceeded
}
// Copy into buffer
if err := s.recvBuf.Append(conn, length); err != nil {
s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
s.session.logger.Printf("[ERR] yamux: Failed to read stream data on stream %d: %v", s.id, err)
return err
}
// Unblock the reader
......
package yamux
import (
"fmt"
"io"
"sync"
......@@ -42,14 +43,12 @@ func min(values ...uint32) uint32 {
// | data | empty space |
// < window (10) >
// < len (5) > < cap (5) >
// < pending (4) >
//
// As data is read, the buffer gets updated like so:
//
// | data | empty space |
// < window (8) >
// < len (3) > < cap (5) >
// < pending (4) >
//
// It can then grow as follows (given a "max" of 10):
//
......@@ -57,21 +56,18 @@ func min(values ...uint32) uint32 {
// | data | empty space |
// < window (10) >
// < len (3) > < cap (7) >
// < pending (4) >
//
// Data can then be written into the pending space, expanding len, and shrinking
// cap and pending:
// Data can then be written into the empty space, expanding len,
// and shrinking cap:
//
// | data | empty space |
// < window (10) >
// < len (5) > < cap (5) >
// < pending (2)>
//
type segmentedBuffer struct {
cap uint32
pending uint32
len uint32
bm sync.Mutex
cap uint32
len uint32
bm sync.Mutex
// read position in b[0].
// We must not reslice any of the buffers in b, as we need to put them back into the pool.
readPos int
......@@ -120,16 +116,6 @@ func (s *segmentedBuffer) GrowTo(max uint32, force bool) (bool, uint32) {
return true, delta
}
func (s *segmentedBuffer) TryReserve(space uint32) bool {
s.bm.Lock()
defer s.bm.Unlock()
if s.cap < s.pending+space {
return false
}
s.pending += space
return true
}
func (s *segmentedBuffer) Read(b []byte) (int, error) {
s.bm.Lock()
defer s.bm.Unlock()
......@@ -152,7 +138,20 @@ func (s *segmentedBuffer) Read(b []byte) (int, error) {
return n, nil
}
func (s *segmentedBuffer) checkOverflow(l uint32) error {
s.bm.Lock()
defer s.bm.Unlock()
if s.cap < l {
return fmt.Errorf("receive window exceeded (remain: %d, recv: %d)", s.cap, l)
}
return nil
}
func (s *segmentedBuffer) Append(input io.Reader, length uint32) error {
if err := s.checkOverflow(length); err != nil {
return err
}
dst := pool.Get(int(length))
n, err := io.ReadFull(input, dst)
if err == io.EOF {
......@@ -163,7 +162,6 @@ func (s *segmentedBuffer) Append(input io.Reader, length uint32) error {
if n > 0 {
s.len += uint32(n)
s.cap -= uint32(n)
s.pending = s.pending - length
s.b = append(s.b, dst[0:n])
}
return err
......
......@@ -63,9 +63,6 @@ func TestSegmentedBuffer(t *testing.T) {
}
}
assert(0, 100)
if !buf.TryReserve(3) {
t.Fatal("reservation should have worked")
}
if err := buf.Append(bytes.NewReader([]byte("fooo")), 3); err != nil {
t.Fatal(err)
}
......@@ -87,9 +84,6 @@ func TestSegmentedBuffer(t *testing.T) {
t.Fatal("should have grown by 2")
}
if !buf.TryReserve(50) {
t.Fatal("reservation should have worked")
}
if err := buf.Append(bytes.NewReader(make([]byte, 50)), 50); err != nil {
t.Fatal(err)
}
......@@ -104,9 +98,7 @@ func TestSegmentedBuffer(t *testing.T) {
if read != 50 {
t.Fatal("expected to read 50 bytes")
}
if !buf.TryReserve(49) {
t.Fatal("should have been able to reserve rest of space")
}
assert(1, 49)
if grew, amount := buf.GrowTo(100, false); !grew || amount != 50 {
t.Fatal("should have grown when below half, even with reserved space")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment