Unverified Commit fd43d7f1 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #33 from libp2p/fix/recv-buf

fix space accounting in the receive buffer
parents 0135c852 134ee206
......@@ -52,7 +52,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
sendWindow: initialStreamWindow,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
recvBuf: NewSegmentedBuffer(initialStreamWindow),
recvBuf: newSegmentedBuffer(initialStreamWindow),
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
}
......
......@@ -3,7 +3,6 @@ package yamux
import (
"io"
"sync"
"sync/atomic"
pool "github.com/libp2p/go-buffer-pool"
)
......@@ -38,6 +37,36 @@ func min(values ...uint32) uint32 {
return m
}
// The segmented buffer looks like:
//
// | data | empty space |
// < window (10) >
// < len (5) > < cap (5) >
// < pending (4) >
//
// As data is read, the buffer gets updated like so:
//
// | data | empty space |
// < window (8) >
// < len (3) > < cap (5) >
// < pending (4) >
//
// It can then grow as follows (given a "max" of 10):
//
//
// | data | empty space |
// < window (10) >
// < len (3) > < cap (7) >
// < pending (4) >
//
// Data can then be written into the pending space, expanding len, and shrinking
// cap and pending:
//
// | data | empty space |
// < window (10) >
// < len (5) > < cap (5) >
// < pending (2)>
//
type segmentedBuffer struct {
cap uint32
pending uint32
......@@ -47,16 +76,27 @@ type segmentedBuffer struct {
}
// NewSegmentedBuffer allocates a ring buffer.
func NewSegmentedBuffer(initialCapacity uint32) segmentedBuffer {
func newSegmentedBuffer(initialCapacity uint32) segmentedBuffer {
return segmentedBuffer{cap: initialCapacity, b: make([][]byte, 0)}
}
// Len is the amount of data in the receive buffer.
func (s *segmentedBuffer) Len() int {
return int(atomic.LoadUint32(&s.len))
s.bm.Lock()
len := s.len
s.bm.Unlock()
return int(len)
}
// Cap is the remaining capacity in the receive buffer.
//
// Note: this is _not_ the same as go's 'cap' function. The total size of the
// buffer is len+cap.
func (s *segmentedBuffer) Cap() uint32 {
return atomic.LoadUint32(&s.cap)
s.bm.Lock()
cap := s.cap
s.bm.Unlock()
return cap
}
// If the space to write into + current buffer size has grown to half of the window size,
......@@ -65,15 +105,9 @@ func (s *segmentedBuffer) GrowTo(max uint32, force bool) (bool, uint32) {
s.bm.Lock()
defer s.bm.Unlock()
currentWindow := atomic.LoadUint32(&s.len) + atomic.LoadUint32(&s.cap) + s.pending
if currentWindow > max {
// somewhat counter-intuitively not an error.
// note that len+cap is the 'window' that shouldn't exceed max or a reservation
// would fail, triggering an error.
// We pre-count 'pending' data where we've read a header and are working on
// reading it into available data here, so that we don't undercount the remaining
// window size, but that can mean this sum ends up larger than max.
return false, 0
currentWindow := s.cap + s.len
if currentWindow >= max {
return force, 0
}
delta := max - currentWindow
......@@ -81,16 +115,14 @@ func (s *segmentedBuffer) GrowTo(max uint32, force bool) (bool, uint32) {
return false, 0
}
atomic.AddUint32(&s.cap, delta)
s.cap += delta
return true, delta
}
func (s *segmentedBuffer) TryReserve(space uint32) bool {
// It is noticable that the check-and-set of pending is not atomic,
// Due to this, accesses to pending are protected by bm.
s.bm.Lock()
defer s.bm.Unlock()
if atomic.LoadUint32(&s.cap) < s.pending+space {
if s.cap < s.pending+space {
return false
}
s.pending += space
......@@ -112,7 +144,7 @@ func (s *segmentedBuffer) Read(b []byte) (int, error) {
s.b[0] = s.b[0][n:]
}
if n > 0 {
atomic.AddUint32(&s.len, ^uint32(n-1))
s.len -= uint32(n)
}
return n, nil
}
......@@ -130,16 +162,15 @@ func (s *segmentedBuffer) Append(input io.Reader, length int) error {
if length == n {
err = nil
} else {
err = ErrStreamReset
err = io.ErrUnexpectedEOF
}
}
s.bm.Lock()
defer s.bm.Unlock()
if n > 0 {
atomic.AddUint32(&s.len, uint32(n))
// cap -= n
atomic.AddUint32(&s.cap, ^uint32(n-1))
s.len += uint32(n)
s.cap -= uint32(n)
s.pending = s.pending - uint32(length)
s.b = append(s.b, dst[0:n])
}
......
package yamux
import (
"bytes"
"io"
"io/ioutil"
"testing"
)
......@@ -48,3 +51,65 @@ func TestMin(t *testing.T) {
t.Fatalf("bad")
}
}
func TestSegmentedBuffer(t *testing.T) {
buf := newSegmentedBuffer(100)
assert := func(len, cap int) {
if buf.Len() != len {
t.Fatalf("expected length %d, got %d", len, buf.Len())
}
if buf.Cap() != uint32(cap) {
t.Fatalf("expected length %d, got %d", len, buf.Len())
}
}
assert(0, 100)
if !buf.TryReserve(3) {
t.Fatal("reservation should have worked")
}
if err := buf.Append(bytes.NewReader([]byte("fooo")), 3); err != nil {
t.Fatal(err)
}
assert(3, 97)
out := make([]byte, 2)
n, err := io.ReadFull(&buf, out)
if err != nil {
t.Fatal(err)
}
if n != 2 {
t.Fatalf("expected to read 2 bytes, read %d", n)
}
assert(1, 97)
if grew, amount := buf.GrowTo(100, false); grew || amount != 0 {
t.Fatal("should not grow when too small")
}
if grew, amount := buf.GrowTo(100, true); !grew || amount != 2 {
t.Fatal("should have grown by 2")
}
if !buf.TryReserve(50) {
t.Fatal("reservation should have worked")
}
if err := buf.Append(bytes.NewReader(make([]byte, 50)), 50); err != nil {
t.Fatal(err)
}
assert(51, 49)
if grew, amount := buf.GrowTo(100, false); grew || amount != 0 {
t.Fatal("should not grow when data hasn't been read")
}
read, err := io.CopyN(ioutil.Discard, &buf, 50)
if err != nil {
t.Fatal(err)
}
if read != 50 {
t.Fatal("expected to read 50 bytes")
}
if !buf.TryReserve(49) {
t.Fatal("should have been able to reserve rest of space")
}
assert(1, 49)
if grew, amount := buf.GrowTo(100, false); !grew || amount != 50 {
t.Fatal("should have grown when below half, even with reserved space")
}
assert(1, 99)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment