diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go index 8c7090ec9eb9804d3c6f55c40640e47fcce02efa..cb78109f4c92a6ae82e74eb5f250d70c641907d8 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go @@ -30,19 +30,27 @@ func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan { } } -func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) { - // new buffer per message - // if bottleneck, cycle around a set of buffers - mr := NewReader(r, s.BufPool) +func (s *Chan) getBuffer(size int) []byte { if s.BufPool == nil { - s.BufPool = new(sync.Pool) - s.BufPool.New = func() interface{} { - return make([]byte, maxMsgLen) + return make([]byte, size) + } else { + bufi := s.BufPool.Get() + buf, ok := bufi.([]byte) + if !ok { + panic("Got invalid type from sync pool!") } + return buf } +} + +func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) { + // new buffer per message + // if bottleneck, cycle around a set of buffers + mr := NewReader(r) Loop: for { - buf, err := mr.ReadMsg() + buf := s.getBuffer(maxMsgLen) + l, err := mr.ReadMsg(buf) if err != nil { if err == io.EOF { break Loop // done @@ -56,7 +64,7 @@ Loop: select { case <-s.CloseChan: break Loop // told we're done - case s.MsgChan <- buf: + case s.MsgChan <- buf[:l]: // ok seems fine. send it away } } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go index ebf7f872b96d7442cb3c177b8efe6742f81d8819..1d3a3d05640ca0fbf93fe7c933690ae0bf0652f6 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go @@ -3,7 +3,6 @@ package msgio import ( "encoding/binary" "io" - "sync" ) var NBO = binary.BigEndian @@ -18,7 +17,7 @@ type WriteCloser interface { } type Reader interface { - ReadMsg() ([]byte, error) + ReadMsg([]byte) (int, error) } type ReadCloser interface { @@ -64,30 +63,22 @@ func (s *Writer_) Close() error { type Reader_ struct { R io.Reader lbuf []byte - bp *sync.Pool } -func NewReader(r io.Reader, bufpool *sync.Pool) ReadCloser { - return &Reader_{R: r, lbuf: make([]byte, 4), bp: bufpool} +func NewReader(r io.Reader) ReadCloser { + return &Reader_{r, make([]byte, 4)} } -func (s *Reader_) ReadMsg() ([]byte, error) { +func (s *Reader_) ReadMsg(msg []byte) (int, error) { if _, err := io.ReadFull(s.R, s.lbuf); err != nil { - return nil, err + return 0, err } - - bufi := s.bp.Get() - buf, ok := bufi.([]byte) - if !ok { - panic("invalid type in pool!") - } - length := int(NBO.Uint32(s.lbuf)) - if length < 0 || length > len(buf) { - return nil, io.ErrShortBuffer + if length < 0 || length > len(msg) { + return 0, io.ErrShortBuffer } - _, err := io.ReadFull(s.R, buf[:length]) - return buf[:length], err + _, err := io.ReadFull(s.R, msg[:length]) + return length, err } func (s *Reader_) Close() error { @@ -104,7 +95,7 @@ type ReadWriter_ struct { func NewReadWriter(rw io.ReadWriter) ReadWriter { return &ReadWriter_{ - Reader: NewReader(rw, nil), + Reader: NewReader(rw), Writer: NewWriter(rw), } } diff --git a/blocks/blocks.go b/blocks/blocks.go index 9bf556f5a625677ef77e935beed4752c5eb5fea1..b87cf5a321447a3ea1c972f22923c08253c8d42b 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -1,6 +1,7 @@ package blocks import ( + "errors" "fmt" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" @@ -18,6 +19,16 @@ func NewBlock(data []byte) *Block { return &Block{Data: data, Multihash: u.Hash(data)} } +func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) { + if u.Debug { + chk := u.Hash(data) + if string(chk) != string(h) { + return nil, errors.New("Data did not match given hash!") + } + } + return &Block{Data: data, Multihash: h}, nil +} + // Key returns the block's Multihash as a Key value. func (b *Block) Key() u.Key { return u.Key(b.Multihash) diff --git a/crypto/spipe/handshake.go b/crypto/spipe/handshake.go index 37bb79ae529e64386a6ab0eaf4f9863439f13617..2b548525526317a4950463f00ac41352a4454768 100644 --- a/crypto/spipe/handshake.go +++ b/crypto/spipe/handshake.go @@ -185,44 +185,10 @@ func (s *SecurePipe) handshake() error { cmp := bytes.Compare(myPubKey, proposeResp.GetPubkey()) - if true { - mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret) - - go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey) - go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey) - - } else { - log.Critical("Secure Channel Disabled! PLEASE ENSURE YOU KNOW WHAT YOU ARE DOING") - // Disable Secure Channel - go func(sp *SecurePipe) { - for { - select { - case <-sp.ctx.Done(): - return - case m, ok := <-sp.insecure.In: - if !ok { - sp.cancel() - return - } - sp.In <- m - } - } - }(s) - go func(sp *SecurePipe) { - for { - select { - case <-sp.ctx.Done(): - return - case m, ok := <-sp.Out: - if !ok { - sp.cancel() - return - } - sp.insecure.Out <- m - } - } - }(s) - } + mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret) + + go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey) + go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey) finished := []byte("Finished") diff --git a/crypto/spipe/internal/pb/spipe.pb.go b/crypto/spipe/internal/pb/spipe.pb.go index 5f1b34c2bfbf5df63d906ad54769c58fd223f45c..9b5b867e2aea84e149b49047675c654e03758d11 100644 --- a/crypto/spipe/internal/pb/spipe.pb.go +++ b/crypto/spipe/internal/pb/spipe.pb.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-gogo. // source: spipe.proto // DO NOT EDIT! @@ -15,7 +15,7 @@ It has these top-level messages: */ package spipe_pb -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" +import proto "code.google.com/p/gogoprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. @@ -95,9 +95,10 @@ func (m *Exchange) GetSignature() []byte { } type DataSig struct { - Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"` - Sig []byte `protobuf:"bytes,2,opt,name=sig" json:"sig,omitempty"` - XXX_unrecognized []byte `json:"-"` + Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"` + Sig []byte `protobuf:"bytes,2,opt,name=sig" json:"sig,omitempty"` + Id *uint64 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *DataSig) Reset() { *m = DataSig{} } @@ -118,5 +119,12 @@ func (m *DataSig) GetSig() []byte { return nil } +func (m *DataSig) GetId() uint64 { + if m != nil && m.Id != nil { + return *m.Id + } + return 0 +} + func init() { } diff --git a/crypto/spipe/internal/pb/spipe.proto b/crypto/spipe/internal/pb/spipe.proto index 699821b07eec94928824a866c6707220777c637f..94e4f06f7c7b38bba5b4e94cbee9261a8fa3e4bc 100644 --- a/crypto/spipe/internal/pb/spipe.proto +++ b/crypto/spipe/internal/pb/spipe.proto @@ -16,4 +16,5 @@ message Exchange { message DataSig { optional bytes data = 1; optional bytes sig = 2; + optional uint64 id = 3; } diff --git a/crypto/spipe/signedpipe.go b/crypto/spipe/signedpipe.go index 44598130533b90c8091f9c08cd98281af1c5d5e0..938b88f1b6b55da94ae479249ef2a8fe48be8126 100644 --- a/crypto/spipe/signedpipe.go +++ b/crypto/spipe/signedpipe.go @@ -24,18 +24,23 @@ type SignedPipe struct { ctx context.Context cancel context.CancelFunc + + mesid uint64 + theirmesid uint64 } +// secureChallengeSize is a constant that determines the initial challenge, and every subsequent +// sequence number. It should be large enough to be unguessable by adversaries (128+ bits). +// (SECURITY WARNING) +const secureChallengeSize = (256 / 32) + func NewSignedPipe(parctx context.Context, bufsize int, local peer.Peer, peers peer.Peerstore, insecure pipes.Duplex) (*SignedPipe, error) { ctx, cancel := context.WithCancel(parctx) sp := &SignedPipe{ - Duplex: pipes.Duplex{ - In: make(chan []byte, bufsize), - Out: make(chan []byte, bufsize), - }, + Duplex: pipes.NewDuplex(bufsize), local: local, peers: peers, insecure: insecure, @@ -51,6 +56,36 @@ func NewSignedPipe(parctx context.Context, bufsize int, local peer.Peer, return sp, nil } +func (sp *SignedPipe) trySend(b []byte) bool { + select { + case <-sp.ctx.Done(): + return false + case sp.insecure.Out <- b: + return true + } +} + +func (sp *SignedPipe) tryRecv() ([]byte, bool) { + select { + case <-sp.ctx.Done(): + return nil, false + case data, ok := <-sp.insecure.In: + if !ok { + return nil, false + } + return data, true + } +} + +func reduceChallenge(cha []byte) uint64 { + var out uint64 + for _, b := range cha { + out ^= uint64(b) + out = out << 1 + } + return out +} + func (sp *SignedPipe) handshake() error { // Send them our public key pubk := sp.local.PubKey() @@ -59,7 +94,10 @@ func (sp *SignedPipe) handshake() error { return err } - sp.insecure.Out <- pkb + // Exchange public keys with remote peer + if !sp.trySend(pkb) { + return context.Canceled + } theirPkb := <-sp.insecure.In theirPubKey, err := ci.UnmarshalPublicKey(theirPkb) @@ -67,7 +105,7 @@ func (sp *SignedPipe) handshake() error { return err } - challenge := make([]byte, 32) + challenge := make([]byte, secureChallengeSize) rand.Read(challenge) enc, err := theirPubKey.Encrypt(challenge) @@ -75,24 +113,62 @@ func (sp *SignedPipe) handshake() error { return err } - sp.insecure.Out <- enc - theirEnc := <-sp.insecure.In + chsig, err := sp.local.PrivKey().Sign(challenge) + if err != nil { + return err + } + + if !sp.trySend(enc) { + return context.Canceled + } + if !sp.trySend(chsig) { + return context.Canceled + } + + theirEnc, ok := sp.tryRecv() + if !ok { + return context.Canceled + } + theirChSig, ok := sp.tryRecv() + if !ok { + return context.Canceled + } + // Unencrypt and verify their challenge unenc, err := sp.local.PrivKey().Unencrypt(theirEnc) if err != nil { return err } + ok, err = theirPubKey.Verify(unenc, theirChSig) + if err != nil { + return err + } + if !ok { + return errors.New("Invalid signature!") + } + // Sign the unencrypted challenge, and send it back sig, err := sp.local.PrivKey().Sign(unenc) if err != nil { return err } - sp.insecure.Out <- unenc - theirUnenc := <-sp.insecure.In - sp.insecure.Out <- sig - theirSig := <-sp.insecure.In + if !sp.trySend(unenc) { + return context.Canceled + } + if !sp.trySend(sig) { + return context.Canceled + } + theirUnenc, ok := sp.tryRecv() + if !ok { + return context.Canceled + } + theirSig, ok := sp.tryRecv() + if !ok { + return context.Canceled + } + // Verify that they correctly unecrypted the challenge if !bytes.Equal(theirUnenc, challenge) { return errors.New("received bad challenge response") } @@ -106,12 +182,29 @@ func (sp *SignedPipe) handshake() error { return errors.New("Incorrect signature on challenge") } + sp.theirmesid = reduceChallenge(challenge) + sp.mesid = reduceChallenge(unenc) + go sp.handleIn(theirPubKey) - go sp.handleOut() + go sp.handleOut(sp.local.PrivKey()) finished := []byte("finished") - sp.Out <- finished - resp := <-sp.In + + select { + case <-sp.ctx.Done(): + return context.Canceled + case sp.Out <- finished: + } + + var resp []byte + select { + case <-sp.ctx.Done(): + return context.Canceled + case resp, ok = <-sp.In: + if !ok { + return errors.New("Channel closed before handshake finished.") + } + } if !bytes.Equal(resp, finished) { return errors.New("Handshake failed!") } @@ -119,7 +212,7 @@ func (sp *SignedPipe) handshake() error { return nil } -func (sp *SignedPipe) handleOut() { +func (sp *SignedPipe) handleOut(pk ci.PrivKey) { for { var data []byte var ok bool @@ -135,19 +228,21 @@ func (sp *SignedPipe) handleOut() { sdata := new(pb.DataSig) - sig, err := sp.local.PrivKey().Sign(data) + sig, err := pk.Sign(data) if err != nil { log.Error("Error signing outgoing data: %s", err) - continue + return } sdata.Data = data sdata.Sig = sig + sdata.Id = proto.Uint64(sp.mesid) b, err := proto.Marshal(sdata) if err != nil { log.Error("Error marshaling signed data object: %s", err) - continue + return } + sp.mesid++ select { case sp.insecure.Out <- b: @@ -188,6 +283,12 @@ func (sp *SignedPipe) handleIn(theirPubkey ci.PubKey) { continue } + if sdata.GetId() != sp.theirmesid { + log.Critical("Out of order message id!") + return + } + sp.theirmesid++ + select { case <-sp.ctx.Done(): return