Commit 95095724 authored by Jeromy's avatar Jeromy

address comments from PR

parent a8069024
......@@ -30,19 +30,27 @@ func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan {
func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mr := NewReader(r, s.BufPool)
func (s *Chan) getBuffer(size int) []byte {
if s.BufPool == nil {
s.BufPool = new(sync.Pool)
s.BufPool.New = func() interface{} {
return make([]byte, maxMsgLen)
return make([]byte, size)
} else {
bufi := s.BufPool.Get()
buf, ok := bufi.([]byte)
if !ok {
panic("Got invalid type from sync pool!")
return buf
func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mr := NewReader(r)
for {
buf, err := mr.ReadMsg()
buf := s.getBuffer(maxMsgLen)
l, err := mr.ReadMsg(buf)
if err != nil {
if err == io.EOF {
break Loop // done
......@@ -56,7 +64,7 @@ Loop:
select {
case <-s.CloseChan:
break Loop // told we're done
case s.MsgChan <- buf:
case s.MsgChan <- buf[:l]:
// ok seems fine. send it away
......@@ -3,7 +3,6 @@ package msgio
import (
var NBO = binary.BigEndian
......@@ -18,7 +17,7 @@ type WriteCloser interface {
type Reader interface {
ReadMsg() ([]byte, error)
ReadMsg([]byte) (int, error)
type ReadCloser interface {
......@@ -64,30 +63,22 @@ func (s *Writer_) Close() error {
type Reader_ struct {
R io.Reader
lbuf []byte
bp *sync.Pool
func NewReader(r io.Reader, bufpool *sync.Pool) ReadCloser {
return &Reader_{R: r, lbuf: make([]byte, 4), bp: bufpool}
func NewReader(r io.Reader) ReadCloser {
return &Reader_{r, make([]byte, 4)}
func (s *Reader_) ReadMsg() ([]byte, error) {
func (s *Reader_) ReadMsg(msg []byte) (int, error) {
if _, err := io.ReadFull(s.R, s.lbuf); err != nil {
return nil, err
return 0, err
bufi := s.bp.Get()
buf, ok := bufi.([]byte)
if !ok {
panic("invalid type in pool!")
length := int(NBO.Uint32(s.lbuf))
if length < 0 || length > len(buf) {
return nil, io.ErrShortBuffer
if length < 0 || length > len(msg) {
return 0, io.ErrShortBuffer
_, err := io.ReadFull(s.R, buf[:length])
return buf[:length], err
_, err := io.ReadFull(s.R, msg[:length])
return length, err
func (s *Reader_) Close() error {
......@@ -104,7 +95,7 @@ type ReadWriter_ struct {
func NewReadWriter(rw io.ReadWriter) ReadWriter {
return &ReadWriter_{
Reader: NewReader(rw, nil),
Reader: NewReader(rw),
Writer: NewWriter(rw),
package blocks
import (
mh ""
......@@ -18,6 +19,16 @@ func NewBlock(data []byte) *Block {
return &Block{Data: data, Multihash: u.Hash(data)}
func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) {
if u.Debug {
chk := u.Hash(data)
if string(chk) != string(h) {
return nil, errors.New("Data did not match given hash!")
return &Block{Data: data, Multihash: h}, nil
// Key returns the block's Multihash as a Key value.
func (b *Block) Key() u.Key {
return u.Key(b.Multihash)
......@@ -185,44 +185,10 @@ func (s *SecurePipe) handshake() error {
cmp := bytes.Compare(myPubKey, proposeResp.GetPubkey())
if true {
mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret)
go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey)
go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey)
} else {
log.Critical("Secure Channel Disabled! PLEASE ENSURE YOU KNOW WHAT YOU ARE DOING")
// Disable Secure Channel
go func(sp *SecurePipe) {
for {
select {
case <-sp.ctx.Done():
case m, ok := <-sp.insecure.In:
if !ok {
sp.In <- m
go func(sp *SecurePipe) {
for {
select {
case <-sp.ctx.Done():
case m, ok := <-sp.Out:
if !ok {
sp.insecure.Out <- m
mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret)
go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey)
go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey)
finished := []byte("Finished")
// Code generated by protoc-gen-go.
// Code generated by protoc-gen-gogo.
// source: spipe.proto
......@@ -15,7 +15,7 @@ It has these top-level messages:
package spipe_pb
import proto ""
import proto ""
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
......@@ -95,9 +95,10 @@ func (m *Exchange) GetSignature() []byte {
type DataSig struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Sig []byte `protobuf:"bytes,2,opt,name=sig" json:"sig,omitempty"`
XXX_unrecognized []byte `json:"-"`
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Sig []byte `protobuf:"bytes,2,opt,name=sig" json:"sig,omitempty"`
Id *uint64 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"`
XXX_unrecognized []byte `json:"-"`
func (m *DataSig) Reset() { *m = DataSig{} }
......@@ -118,5 +119,12 @@ func (m *DataSig) GetSig() []byte {
return nil
func (m *DataSig) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
return 0
func init() {
......@@ -16,4 +16,5 @@ message Exchange {
message DataSig {
optional bytes data = 1;
optional bytes sig = 2;
optional uint64 id = 3;
......@@ -24,18 +24,23 @@ type SignedPipe struct {
ctx context.Context
cancel context.CancelFunc
mesid uint64
theirmesid uint64
// secureChallengeSize is a constant that determines the initial challenge, and every subsequent
// sequence number. It should be large enough to be unguessable by adversaries (128+ bits).
const secureChallengeSize = (256 / 32)
func NewSignedPipe(parctx context.Context, bufsize int, local peer.Peer,
peers peer.Peerstore, insecure pipes.Duplex) (*SignedPipe, error) {
ctx, cancel := context.WithCancel(parctx)
sp := &SignedPipe{
Duplex: pipes.Duplex{
In: make(chan []byte, bufsize),
Out: make(chan []byte, bufsize),
Duplex: pipes.NewDuplex(bufsize),
local: local,
peers: peers,
insecure: insecure,
......@@ -51,6 +56,36 @@ func NewSignedPipe(parctx context.Context, bufsize int, local peer.Peer,
return sp, nil
func (sp *SignedPipe) trySend(b []byte) bool {
select {
case <-sp.ctx.Done():
return false
case sp.insecure.Out <- b:
return true
func (sp *SignedPipe) tryRecv() ([]byte, bool) {
select {
case <-sp.ctx.Done():
return nil, false
case data, ok := <-sp.insecure.In:
if !ok {
return nil, false
return data, true
func reduceChallenge(cha []byte) uint64 {
var out uint64
for _, b := range cha {
out ^= uint64(b)
out = out << 1
return out
func (sp *SignedPipe) handshake() error {
// Send them our public key
pubk := sp.local.PubKey()
......@@ -59,7 +94,10 @@ func (sp *SignedPipe) handshake() error {
return err
sp.insecure.Out <- pkb
// Exchange public keys with remote peer
if !sp.trySend(pkb) {
return context.Canceled
theirPkb := <-sp.insecure.In
theirPubKey, err := ci.UnmarshalPublicKey(theirPkb)
......@@ -67,7 +105,7 @@ func (sp *SignedPipe) handshake() error {
return err
challenge := make([]byte, 32)
challenge := make([]byte, secureChallengeSize)
enc, err := theirPubKey.Encrypt(challenge)
......@@ -75,24 +113,62 @@ func (sp *SignedPipe) handshake() error {
return err
sp.insecure.Out <- enc
theirEnc := <-sp.insecure.In
chsig, err := sp.local.PrivKey().Sign(challenge)
if err != nil {
return err
if !sp.trySend(enc) {
return context.Canceled
if !sp.trySend(chsig) {
return context.Canceled
theirEnc, ok := sp.tryRecv()
if !ok {
return context.Canceled
theirChSig, ok := sp.tryRecv()
if !ok {
return context.Canceled
// Unencrypt and verify their challenge
unenc, err := sp.local.PrivKey().Unencrypt(theirEnc)
if err != nil {
return err
ok, err = theirPubKey.Verify(unenc, theirChSig)
if err != nil {
return err
if !ok {
return errors.New("Invalid signature!")
// Sign the unencrypted challenge, and send it back
sig, err := sp.local.PrivKey().Sign(unenc)
if err != nil {
return err
sp.insecure.Out <- unenc
theirUnenc := <-sp.insecure.In
sp.insecure.Out <- sig
theirSig := <-sp.insecure.In
if !sp.trySend(unenc) {
return context.Canceled
if !sp.trySend(sig) {
return context.Canceled
theirUnenc, ok := sp.tryRecv()
if !ok {
return context.Canceled
theirSig, ok := sp.tryRecv()
if !ok {
return context.Canceled
// Verify that they correctly unecrypted the challenge
if !bytes.Equal(theirUnenc, challenge) {
return errors.New("received bad challenge response")
......@@ -106,12 +182,29 @@ func (sp *SignedPipe) handshake() error {
return errors.New("Incorrect signature on challenge")
sp.theirmesid = reduceChallenge(challenge)
sp.mesid = reduceChallenge(unenc)
go sp.handleIn(theirPubKey)
go sp.handleOut()
go sp.handleOut(sp.local.PrivKey())
finished := []byte("finished")
sp.Out <- finished
resp := <-sp.In
select {
case <-sp.ctx.Done():
return context.Canceled
case sp.Out <- finished:
var resp []byte
select {
case <-sp.ctx.Done():
return context.Canceled
case resp, ok = <-sp.In:
if !ok {
return errors.New("Channel closed before handshake finished.")
if !bytes.Equal(resp, finished) {
return errors.New("Handshake failed!")
......@@ -119,7 +212,7 @@ func (sp *SignedPipe) handshake() error {
return nil
func (sp *SignedPipe) handleOut() {
func (sp *SignedPipe) handleOut(pk ci.PrivKey) {
for {
var data []byte
var ok bool
......@@ -135,19 +228,21 @@ func (sp *SignedPipe) handleOut() {
sdata := new(pb.DataSig)
sig, err := sp.local.PrivKey().Sign(data)
sig, err := pk.Sign(data)
if err != nil {
log.Error("Error signing outgoing data: %s", err)
sdata.Data = data
sdata.Sig = sig
sdata.Id = proto.Uint64(sp.mesid)
b, err := proto.Marshal(sdata)
if err != nil {
log.Error("Error marshaling signed data object: %s", err)
select {
case sp.insecure.Out <- b:
......@@ -188,6 +283,12 @@ func (sp *SignedPipe) handleIn(theirPubkey ci.PubKey) {
if sdata.GetId() != sp.theirmesid {
log.Critical("Out of order message id!")
select {
case <-sp.ctx.Done():
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment