Commit f2e428d4 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

moved versionhandshake to conn

parent d47115bc
...@@ -2,6 +2,7 @@ package conn ...@@ -2,6 +2,7 @@ package conn
import ( import (
"fmt" "fmt"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
...@@ -14,11 +15,16 @@ import ( ...@@ -14,11 +15,16 @@ import (
var log = u.Logger("conn") var log = u.Logger("conn")
// ChanBuffer is the size of the buffer in the Conn Chan const (
const ChanBuffer = 10 // ChanBuffer is the size of the buffer in the Conn Chan
ChanBuffer = 10
// 1 MB // MaxMessageSize is the size of the largest single message
const MaxMessageSize = 1 << 20 MaxMessageSize = 1 << 20 // 1 MB
// HandshakeTimeout for when nodes first connect
HandshakeTimeout = time.Second * 5
)
// msgioPipe is a pipe using msgio channels. // msgioPipe is a pipe using msgio channels.
type msgioPipe struct { type msgioPipe struct {
...@@ -62,6 +68,13 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer, ...@@ -62,6 +68,13 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer,
go conn.msgio.outgoing.WriteTo(maconn) go conn.msgio.outgoing.WriteTo(maconn)
go conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize) go conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
// version handshake
ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
if err := VersionHandshake(ctxT, conn); err != nil {
conn.Close()
return nil, fmt.Errorf("Version handshake: %s", err)
}
return conn, nil return conn, nil
} }
......
package conn
import (
"errors"
"fmt"
handshake "github.com/jbenet/go-ipfs/net/handshake"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)
// VersionHandshake exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
func VersionHandshake(ctx context.Context, c Conn) error {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
var remoteH, localH *handshake.Handshake1
localH = handshake.CurrentHandshake()
myVerBytes, err := proto.Marshal(localH)
if err != nil {
return err
}
c.Out() <- myVerBytes
log.Debug("Sent my version (%s) to %s", localH, rpeer)
select {
case <-ctx.Done():
return ctx.Err()
case <-c.Done():
return errors.New("remote closed connection during version exchange")
case data, ok := <-c.In():
if !ok {
return fmt.Errorf("error retrieving from conn: %v", rpeer)
}
remoteH = new(handshake.Handshake1)
err = proto.Unmarshal(data, remoteH)
if err != nil {
return fmt.Errorf("could not decode remote version: %q", err)
}
log.Debug("Received remote version (%s) from %s", remoteH, rpeer)
}
if err := handshake.Compatible(localH, remoteH); err != nil {
log.Info("%s (%s) incompatible version with %s (%s)", lpeer, localH, rpeer, remoteH)
return err
}
log.Debug("%s version handshake compatible %s", lpeer, rpeer)
return nil
}
...@@ -5,10 +5,8 @@ import ( ...@@ -5,10 +5,8 @@ import (
"fmt" "fmt"
conn "github.com/jbenet/go-ipfs/net/conn" conn "github.com/jbenet/go-ipfs/net/conn"
handshake "github.com/jbenet/go-ipfs/net/handshake"
msg "github.com/jbenet/go-ipfs/net/message" msg "github.com/jbenet/go-ipfs/net/message"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
) )
...@@ -93,10 +91,6 @@ func (s *Swarm) connSetup(c conn.Conn) error { ...@@ -93,10 +91,6 @@ func (s *Swarm) connSetup(c conn.Conn) error {
// addresses should be figured out through the DHT. // addresses should be figured out through the DHT.
// c.Remote.AddAddress(c.Conn.RemoteMultiaddr()) // c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
if err := s.connVersionExchange(c); err != nil {
return fmt.Errorf("Conn version exchange error: %v", err)
}
// add to conns // add to conns
s.connsLock.Lock() s.connsLock.Lock()
if _, ok := s.conns[c.RemotePeer().Key()]; ok { if _, ok := s.conns[c.RemotePeer().Key()]; ok {
...@@ -113,54 +107,6 @@ func (s *Swarm) connSetup(c conn.Conn) error { ...@@ -113,54 +107,6 @@ func (s *Swarm) connSetup(c conn.Conn) error {
return nil return nil
} }
// connVersionExchange exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
func (s *Swarm) connVersionExchange(r conn.Conn) error {
rpeer := r.RemotePeer()
var remoteH, localH *handshake.Handshake1
localH = handshake.CurrentHandshake()
myVerBytes, err := proto.Marshal(localH)
if err != nil {
return err
}
r.Out() <- myVerBytes
log.Debug("Sent my version(%s) [to = %s]", localH, rpeer)
select {
case <-s.ctx.Done():
return s.ctx.Err()
// case <-remote.Done():
// return errors.New("remote closed connection during version exchange")
case data, ok := <-r.In():
if !ok {
return fmt.Errorf("Error retrieving from conn: %v", rpeer)
}
remoteH = new(handshake.Handshake1)
err = proto.Unmarshal(data, remoteH)
if err != nil {
s.Close()
return fmt.Errorf("connSetup: could not decode remote version: %q", err)
}
log.Debug("Received remote version(%s) [from = %s]", remoteH, rpeer)
}
if err := handshake.Compatible(localH, remoteH); err != nil {
log.Info("%s (%s) incompatible version with %s (%s)", s.local, localH, rpeer, remoteH)
r.Close()
return err
}
log.Debug("[peer: %s] Version compatible", rpeer)
return nil
}
// Handles the unwrapping + sending of messages to the right connection. // Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() { func (s *Swarm) fanOut() {
for { for {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment