Unverified Commit 2471ea54 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #14 from libp2p/fix/memory-improvements

combine writes and avoid a few more allocations
parents 90cce793 bde25859
...@@ -38,9 +38,6 @@ func (s *Chan) ReadFromWithPool(r io.Reader, p *pool.BufferPool) { ...@@ -38,9 +38,6 @@ func (s *Chan) ReadFromWithPool(r io.Reader, p *pool.BufferPool) {
// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all // ReadFrom wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel. // messages, ands sends them down the channel.
func (s *Chan) readFrom(mr Reader) { func (s *Chan) readFrom(mr Reader) {
// single reader, no need for Mutex
mr.(*reader).lock = new(nullLocker)
Loop: Loop:
for { for {
buf, err := mr.ReadMsg() buf, err := mr.ReadMsg()
...@@ -74,8 +71,6 @@ func (s *Chan) WriteTo(w io.Writer) { ...@@ -74,8 +71,6 @@ func (s *Chan) WriteTo(w io.Writer) {
// if bottleneck, cycle around a set of buffers // if bottleneck, cycle around a set of buffers
mw := NewWriter(w) mw := NewWriter(w)
// single writer, no need for Mutex
mw.(*writer).lock = new(nullLocker)
Loop: Loop:
for { for {
select { select {
......
...@@ -76,13 +76,20 @@ type ReadWriteCloser interface { ...@@ -76,13 +76,20 @@ type ReadWriteCloser interface {
type writer struct { type writer struct {
W io.Writer W io.Writer
lock sync.Locker pool *pool.BufferPool
lock sync.Mutex
} }
// NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer // NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer
// will write the length prefix of every message written. // will write the length prefix of every message written.
func NewWriter(w io.Writer) WriteCloser { func NewWriter(w io.Writer) WriteCloser {
return &writer{W: w, lock: new(sync.Mutex)} return NewWriterWithPool(w, pool.GlobalPool)
}
// NewWriterWithPool is identical to NewWriter but allows the user to pass a
// custom buffer pool.
func NewWriterWithPool(w io.Writer, p *pool.BufferPool) WriteCloser {
return &writer{W: w, pool: p}
} }
func (s *writer) Write(msg []byte) (int, error) { func (s *writer) Write(msg []byte) (int, error) {
...@@ -96,10 +103,13 @@ func (s *writer) Write(msg []byte) (int, error) { ...@@ -96,10 +103,13 @@ func (s *writer) Write(msg []byte) (int, error) {
func (s *writer) WriteMsg(msg []byte) (err error) { func (s *writer) WriteMsg(msg []byte) (err error) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if err := WriteLen(s.W, len(msg)); err != nil {
return err buf := s.pool.Get(len(msg) + lengthSize)
} NBO.PutUint32(buf, uint32(len(msg)))
_, err = s.W.Write(msg) copy(buf[lengthSize:], msg)
_, err = s.W.Write(buf)
s.pool.Put(buf)
return err return err
} }
...@@ -114,10 +124,10 @@ func (s *writer) Close() error { ...@@ -114,10 +124,10 @@ func (s *writer) Close() error {
type reader struct { type reader struct {
R io.Reader R io.Reader
lbuf []byte lbuf [lengthSize]byte
next int next int
pool *pool.BufferPool pool *pool.BufferPool
lock sync.Locker lock sync.Mutex
max int // the maximal message size (in bytes) this reader handles max int // the maximal message size (in bytes) this reader handles
} }
...@@ -137,10 +147,8 @@ func NewReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser { ...@@ -137,10 +147,8 @@ func NewReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser {
} }
return &reader{ return &reader{
R: r, R: r,
lbuf: make([]byte, lengthSize),
next: -1, next: -1,
pool: p, pool: p,
lock: new(sync.Mutex),
max: defaultMaxSize, max: defaultMaxSize,
} }
} }
...@@ -156,7 +164,7 @@ func (s *reader) NextMsgLen() (int, error) { ...@@ -156,7 +164,7 @@ func (s *reader) NextMsgLen() (int, error) {
func (s *reader) nextMsgLen() (int, error) { func (s *reader) nextMsgLen() (int, error) {
if s.next == -1 { if s.next == -1 {
n, err := ReadLen(s.R, s.lbuf) n, err := ReadLen(s.R, s.lbuf[:])
if err != nil { if err != nil {
return 0, err return 0, err
} }
......
...@@ -12,16 +12,21 @@ import ( ...@@ -12,16 +12,21 @@ import (
type varintWriter struct { type varintWriter struct {
W io.Writer W io.Writer
lbuf [binary.MaxVarintLen64]byte // for encoding varints pool *pool.BufferPool
lock sync.Mutex // for threadsafe writes lock sync.Mutex // for threadsafe writes
} }
// NewVarintWriter wraps an io.Writer with a varint msgio framed writer. // NewVarintWriter wraps an io.Writer with a varint msgio framed writer.
// The msgio.Writer will write the length prefix of every message written // The msgio.Writer will write the length prefix of every message written
// as a varint, using https://golang.org/pkg/encoding/binary/#PutUvarint // as a varint, using https://golang.org/pkg/encoding/binary/#PutUvarint
func NewVarintWriter(w io.Writer) WriteCloser { func NewVarintWriter(w io.Writer) WriteCloser {
return NewVarintWriterWithPool(w, pool.GlobalPool)
}
func NewVarintWriterWithPool(w io.Writer, p *pool.BufferPool) WriteCloser {
return &varintWriter{ return &varintWriter{
W: w, pool: p,
W: w,
} }
} }
...@@ -37,12 +42,12 @@ func (s *varintWriter) WriteMsg(msg []byte) error { ...@@ -37,12 +42,12 @@ func (s *varintWriter) WriteMsg(msg []byte) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
length := uint64(len(msg)) buf := s.pool.Get(len(msg) + binary.MaxVarintLen64)
n := binary.PutUvarint(s.lbuf[:], length) n := binary.PutUvarint(buf, uint64(len(msg)))
if _, err := s.W.Write(s.lbuf[:n]); err != nil { n += copy(buf[n:], msg)
return err _, err := s.W.Write(buf[:n])
} s.pool.Put(buf)
_, err := s.W.Write(msg)
return err return err
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment