Commit a8069024 authored by Jeromy's avatar Jeromy

cut down on allocations

parent 02792054
......@@ -33,7 +33,7 @@ func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan {
func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mr := NewReader(r)
mr := NewReader(r, s.BufPool)
if s.BufPool == nil {
s.BufPool = new(sync.Pool)
s.BufPool.New = func() interface{} {
......@@ -42,12 +42,7 @@ func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
}
Loop:
for {
bufi := s.BufPool.Get()
buf, ok := bufi.([]byte)
if !ok {
panic("Got invalid type from sync pool!")
}
l, err := mr.ReadMsg(buf)
buf, err := mr.ReadMsg()
if err != nil {
if err == io.EOF {
break Loop // done
......@@ -61,7 +56,7 @@ Loop:
select {
case <-s.CloseChan:
break Loop // told we're done
case s.MsgChan <- buf[:l]:
case s.MsgChan <- buf:
// ok seems fine. send it away
}
}
......
......@@ -3,6 +3,7 @@ package msgio
import (
"encoding/binary"
"io"
"sync"
)
var NBO = binary.BigEndian
......@@ -17,7 +18,7 @@ type WriteCloser interface {
}
type Reader interface {
ReadMsg([]byte) (int, error)
ReadMsg() ([]byte, error)
}
type ReadCloser interface {
......@@ -63,22 +64,30 @@ func (s *Writer_) Close() error {
type Reader_ struct {
R io.Reader
lbuf []byte
bp *sync.Pool
}
func NewReader(r io.Reader) ReadCloser {
return &Reader_{r, make([]byte, 4)}
func NewReader(r io.Reader, bufpool *sync.Pool) ReadCloser {
return &Reader_{R: r, lbuf: make([]byte, 4), bp: bufpool}
}
func (s *Reader_) ReadMsg(msg []byte) (int, error) {
func (s *Reader_) ReadMsg() ([]byte, error) {
if _, err := io.ReadFull(s.R, s.lbuf); err != nil {
return 0, err
return nil, err
}
bufi := s.bp.Get()
buf, ok := bufi.([]byte)
if !ok {
panic("invalid type in pool!")
}
length := int(NBO.Uint32(s.lbuf))
if length < 0 || length > len(msg) {
return 0, io.ErrShortBuffer
if length < 0 || length > len(buf) {
return nil, io.ErrShortBuffer
}
_, err := io.ReadFull(s.R, msg[:length])
return length, err
_, err := io.ReadFull(s.R, buf[:length])
return buf[:length], err
}
func (s *Reader_) Close() error {
......@@ -95,7 +104,7 @@ type ReadWriter_ struct {
func NewReadWriter(rw io.ReadWriter) ReadWriter {
return &ReadWriter_{
Reader: NewReader(rw),
Reader: NewReader(rw, nil),
Writer: NewWriter(rw),
}
}
......
......@@ -303,21 +303,20 @@ func (s *SecurePipe) handleSecureIn(hashType, cipherType string, tIV, tCKey, tMK
}
mark := len(data) - macSize
buff := make([]byte, mark)
theirCipher.XORKeyStream(buff, data[0:mark])
theirMac.Write(data[0:mark])
expected := theirMac.Sum(nil)
theirMac.Reset()
hmacOk := hmac.Equal(data[mark:], expected)
if hmacOk {
s.Duplex.In <- buff
} else {
if !hmacOk {
s.Duplex.In <- nil
continue
}
theirCipher.XORKeyStream(data, data[0:mark])
s.Duplex.In <- data[:mark]
}
}
......
......@@ -41,7 +41,7 @@ func init() {
func ReleaseBuffer(b []byte) {
log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
if cap(b) != MaxMessageSize {
log.Warning("Release buffer failed.")
log.Warning("Release buffer failed (cap, size = %d, %d)", cap(b), len(b))
return
}
BufferPool.Put(b[:cap(b)])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment