diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go index 1c16a9f0ff12255f9b21c58ccd45c91afc33aa64..8c7090ec9eb9804d3c6f55c40640e47fcce02efa 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go @@ -33,7 +33,7 @@ func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan { func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) { // new buffer per message // if bottleneck, cycle around a set of buffers - mr := NewReader(r) + mr := NewReader(r, s.BufPool) if s.BufPool == nil { s.BufPool = new(sync.Pool) s.BufPool.New = func() interface{} { @@ -42,12 +42,7 @@ func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) { } Loop: for { - bufi := s.BufPool.Get() - buf, ok := bufi.([]byte) - if !ok { - panic("Got invalid type from sync pool!") - } - l, err := mr.ReadMsg(buf) + buf, err := mr.ReadMsg() if err != nil { if err == io.EOF { break Loop // done @@ -61,7 +56,7 @@ Loop: select { case <-s.CloseChan: break Loop // told we're done - case s.MsgChan <- buf[:l]: + case s.MsgChan <- buf: // ok seems fine. send it away } } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go index 1d3a3d05640ca0fbf93fe7c933690ae0bf0652f6..ebf7f872b96d7442cb3c177b8efe6742f81d8819 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go @@ -3,6 +3,7 @@ package msgio import ( "encoding/binary" "io" + "sync" ) var NBO = binary.BigEndian @@ -17,7 +18,7 @@ type WriteCloser interface { } type Reader interface { - ReadMsg([]byte) (int, error) + ReadMsg() ([]byte, error) } type ReadCloser interface { @@ -63,22 +64,30 @@ func (s *Writer_) Close() error { type Reader_ struct { R io.Reader lbuf []byte + bp *sync.Pool } -func NewReader(r io.Reader) ReadCloser { - return &Reader_{r, make([]byte, 4)} +func NewReader(r io.Reader, bufpool *sync.Pool) ReadCloser { + return &Reader_{R: r, lbuf: make([]byte, 4), bp: bufpool} } -func (s *Reader_) ReadMsg(msg []byte) (int, error) { +func (s *Reader_) ReadMsg() ([]byte, error) { if _, err := io.ReadFull(s.R, s.lbuf); err != nil { - return 0, err + return nil, err } + + bufi := s.bp.Get() + buf, ok := bufi.([]byte) + if !ok { + panic("invalid type in pool!") + } + length := int(NBO.Uint32(s.lbuf)) - if length < 0 || length > len(msg) { - return 0, io.ErrShortBuffer + if length < 0 || length > len(buf) { + return nil, io.ErrShortBuffer } - _, err := io.ReadFull(s.R, msg[:length]) - return length, err + _, err := io.ReadFull(s.R, buf[:length]) + return buf[:length], err } func (s *Reader_) Close() error { @@ -95,7 +104,7 @@ type ReadWriter_ struct { func NewReadWriter(rw io.ReadWriter) ReadWriter { return &ReadWriter_{ - Reader: NewReader(rw), + Reader: NewReader(rw, nil), Writer: NewWriter(rw), } } diff --git a/crypto/spipe/handshake.go b/crypto/spipe/handshake.go index e412ecd7a8e9771cba95c8a3066317976d4a5f0a..37bb79ae529e64386a6ab0eaf4f9863439f13617 100644 --- a/crypto/spipe/handshake.go +++ b/crypto/spipe/handshake.go @@ -303,21 +303,20 @@ func (s *SecurePipe) handleSecureIn(hashType, cipherType string, tIV, tCKey, tMK } mark := len(data) - macSize - buff := make([]byte, mark) - - theirCipher.XORKeyStream(buff, data[0:mark]) theirMac.Write(data[0:mark]) expected := theirMac.Sum(nil) theirMac.Reset() hmacOk := hmac.Equal(data[mark:], expected) - - if hmacOk { - s.Duplex.In <- buff - } else { + if !hmacOk { s.Duplex.In <- nil + continue } + + theirCipher.XORKeyStream(data, data[0:mark]) + + s.Duplex.In <- data[:mark] } } diff --git a/net/conn/conn.go b/net/conn/conn.go index 272a31c3b89a2af2b1425c71e3dd6eae128a6477..67384420cf85a3e1f6143e0c55b11a1b733eb2f5 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -41,7 +41,7 @@ func init() { func ReleaseBuffer(b []byte) { log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b)) if cap(b) != MaxMessageSize { - log.Warning("Release buffer failed.") + log.Warning("Release buffer failed (cap, size = %d, %d)", cap(b), len(b)) return } BufferPool.Put(b[:cap(b)])