Commit 1edc5a46 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

updated Conn and Swarm

This Commit changes the relationship between Conn and Swarm.
After this, Conn is significantly more autonomous, and follows
an interface.

From here, it will be very easy to make the MultiConn (that
handles multiple Conns per peer).
parent 5681e273
......@@ -52,7 +52,7 @@ type singleConn struct {
}
// Map maps Keys (Peer.IDs) to Connections.
type Map map[u.Key]*Conn
type Map map[u.Key]Conn
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote *peer.Peer,
......@@ -171,6 +171,7 @@ func (c *singleConn) waitToClose(ctx context.Context) {
// closing channels
c.insecure.outgoing.Close()
c.secure.Close()
close(c.msgpipe.Incoming)
}
// IsOpen returns whether this Conn is open or closed.
......
......@@ -4,14 +4,12 @@ import (
"errors"
"fmt"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
conn "github.com/jbenet/go-ipfs/net/conn"
handshake "github.com/jbenet/go-ipfs/net/handshake"
msg "github.com/jbenet/go-ipfs/net/message"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
)
// Open listeners for each network the swarm should listen on
......@@ -39,7 +37,8 @@ func (s *Swarm) listen() error {
// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr ma.Multiaddr) error {
list, err := manet.Listen(maddr)
list, err := conn.Listen(s.ctx, maddr, s.local, s.peers)
if err != nil {
return err
}
......@@ -55,17 +54,12 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error {
// Accept and handle new connections on this listener until it errors
go func() {
for {
nconn, err := list.Accept()
if err != nil {
e := fmt.Errorf("Failed to accept connection: %s - %s", maddr, err)
s.errChan <- e
select {
case <-s.ctx.Done():
return
// if cancel is nil, we're closed.
if s.cancel == nil {
return
}
} else {
go s.handleIncomingConn(nconn)
case conn := <-list.Accept():
go s.handleIncomingConn(conn)
}
}
}()
......@@ -74,42 +68,24 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error {
}
// Handle getting ID from this peer, handshake, and adding it into the map
func (s *Swarm) handleIncomingConn(nconn manet.Conn) {
// Construct conn with nil peer for now, because we don't know its ID yet.
// connSetup will figure this out, and pull out / construct the peer.
c, err := conn.NewConn(s.local, nil, nconn)
if err != nil {
s.errChan <- err
return
}
func (s *Swarm) handleIncomingConn(nconn conn.Conn) {
// Setup the new connection
err = s.connSetup(c)
err := s.connSetup(nconn)
if err != nil && err != ErrAlreadyOpen {
s.errChan <- err
c.Close()
nconn.Close()
}
}
// connSetup adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func (s *Swarm) connSetup(c *conn.Conn) error {
func (s *Swarm) connSetup(c conn.Conn) error {
if c == nil {
return errors.New("Tried to start nil connection.")
}
if c.Remote != nil {
log.Debug("%s Starting connection: %s", c.Local, c.Remote)
} else {
log.Debug("%s Starting connection: [unknown peer]", c.Local)
}
if err := s.connSecure(c); err != nil {
return fmt.Errorf("Conn securing error: %v", err)
}
log.Debug("%s secured connection: %s", c.Local, c.Remote)
log.Debug("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
// add address of connection to Peer. Maybe it should happen in connSecure.
// NOT adding this address here, because the incoming address in TCP
......@@ -123,12 +99,12 @@ func (s *Swarm) connSetup(c *conn.Conn) error {
// add to conns
s.connsLock.Lock()
if _, ok := s.conns[c.Remote.Key()]; ok {
if _, ok := s.conns[c.RemotePeer().Key()]; ok {
log.Debug("Conn already open!")
s.connsLock.Unlock()
return ErrAlreadyOpen
}
s.conns[c.Remote.Key()] = c
s.conns[c.RemotePeer().Key()] = c
log.Debug("Added conn to map!")
s.connsLock.Unlock()
......@@ -137,77 +113,51 @@ func (s *Swarm) connSetup(c *conn.Conn) error {
return nil
}
// connSecure setups a secure remote connection.
func (s *Swarm) connSecure(c *conn.Conn) error {
sp, err := spipe.NewSecurePipe(s.ctx, 10, s.local, s.peers)
if err != nil {
return err
}
err = sp.Wrap(s.ctx, spipe.Duplex{
In: c.Incoming.MsgChan,
Out: c.Outgoing.MsgChan,
})
if err != nil {
return err
}
if c.Remote == nil {
c.Remote = sp.RemotePeer()
} else if c.Remote != sp.RemotePeer() {
panic("peers not being constructed correctly.")
}
c.Secure = sp
return nil
}
// connVersionExchange exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
func (s *Swarm) connVersionExchange(remote *conn.Conn) error {
var remoteHandshake, localHandshake *handshake.Handshake1
localHandshake = handshake.CurrentHandshake()
func (s *Swarm) connVersionExchange(r conn.Conn) error {
rpeer := r.RemotePeer()
myVerBytes, err := proto.Marshal(localHandshake)
var remoteH, localH *handshake.Handshake1
localH = handshake.CurrentHandshake()
myVerBytes, err := proto.Marshal(localH)
if err != nil {
return err
}
remote.Secure.Out <- myVerBytes
log.Debug("Send my version(%s) [to = %s]", localHandshake, remote.Peer)
r.MsgOut() <- msg.New(rpeer, myVerBytes)
log.Debug("Sent my version(%s) [to = %s]", localH, rpeer)
select {
case <-s.ctx.Done():
return s.ctx.Err()
case <-remote.Closed:
return errors.New("remote closed connection during version exchange")
// case <-remote.Done():
// return errors.New("remote closed connection during version exchange")
case data, ok := <-remote.Secure.In:
case data, ok := <-r.MsgIn():
if !ok {
return fmt.Errorf("Error retrieving from conn: %v", remote.Peer)
return fmt.Errorf("Error retrieving from conn: %v", rpeer)
}
remoteHandshake = new(handshake.Handshake1)
err = proto.Unmarshal(data, remoteHandshake)
remoteH = new(handshake.Handshake1)
err = proto.Unmarshal(data.Data(), remoteH)
if err != nil {
s.Close()
return fmt.Errorf("connSetup: could not decode remote version: %q", err)
}
log.Debug("Received remote version(%s) [from = %s]", remoteHandshake, remote.Peer)
log.Debug("Received remote version(%s) [from = %s]", remoteH, rpeer)
}
if err := handshake.Compatible(localHandshake, remoteHandshake); err != nil {
log.Info("%s (%s) incompatible version with %s (%s)", s.local, localHandshake, remote.Peer, remoteHandshake)
remote.Close()
if err := handshake.Compatible(localH, remoteH); err != nil {
log.Info("%s (%s) incompatible version with %s (%s)", s.local, localH, rpeer, remoteH)
r.Close()
return err
}
log.Debug("[peer: %s] Version compatible", remote.Peer)
log.Debug("[peer: %s] Version compatible", rpeer)
return nil
}
......@@ -237,14 +187,14 @@ func (s *Swarm) fanOut() {
// log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer())
// queue it in the connection's buffer
conn.Secure.Out <- msg.Data()
conn.MsgOut() <- msg
}
}
}
// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(c *conn.Conn) {
func (s *Swarm) fanIn(c conn.Conn) {
for {
select {
case <-s.ctx.Done():
......@@ -252,26 +202,21 @@ func (s *Swarm) fanIn(c *conn.Conn) {
c.Close()
goto out
case <-c.Closed:
goto out
case data, ok := <-c.Secure.In:
case data, ok := <-c.MsgIn():
if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", c.Remote)
e := fmt.Errorf("Error retrieving from conn: %v", c.RemotePeer())
s.errChan <- e
goto out
}
// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer)
msg := msg.New(c.Remote, data)
s.Incoming <- msg
s.Incoming <- data
}
}
out:
s.connsLock.Lock()
delete(s.conns, c.Remote.Key())
delete(s.conns, c.RemotePeer().Key())
s.connsLock.Unlock()
}
......
......@@ -11,7 +11,6 @@ import (
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
)
var log = u.Logger("swarm")
......@@ -61,7 +60,7 @@ type Swarm struct {
connsLock sync.RWMutex
// listeners for each network address
listeners []manet.Listener
listeners []conn.Listener
// cancel is an internal function used to stop the Swarm's processing.
cancel context.CancelFunc
......@@ -110,7 +109,7 @@ func (s *Swarm) Close() error {
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) {
func (s *Swarm) Dial(peer *peer.Peer) (conn.Conn, error) {
if peer.ID.Equal(s.local.ID) {
return nil, errors.New("Attempted connection to self!")
}
......@@ -128,7 +127,12 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) {
}
// open connection to peer
c, err = conn.Dial("tcp", s.local, peer)
d := &conn.Dialer{
LocalPeer: s.local,
Peerstore: s.peers,
}
c, err = d.Dial(s.ctx, "tcp", s.local)
if err != nil {
return nil, err
}
......@@ -142,7 +146,7 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) {
}
// GetConnection returns the connection in the swarm to given peer.ID
func (s *Swarm) GetConnection(pid peer.ID) *conn.Conn {
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
s.connsLock.RLock()
c, found := s.conns[u.Key(pid)]
s.connsLock.RUnlock()
......@@ -181,7 +185,7 @@ func (s *Swarm) GetPeerList() []*peer.Peer {
var out []*peer.Peer
s.connsLock.RLock()
for _, p := range s.conns {
out = append(out, p.Remote)
out = append(out, p.RemotePeer())
}
s.connsLock.RUnlock()
return out
......
......@@ -49,13 +49,13 @@ func setupPeer(t *testing.T, addr string) *peer.Peer {
p.PrivKey = sk
p.PubKey = pk
p.AddAddress(tcp)
return p, nil
return p
}
func makeSwarms(ctx context.Context, t *testing.T, peers map[string]string) []*Swarm {
swarms := []*Swarm{}
for key, addr := range peers {
for _, addr := range peers {
local := setupPeer(t, addr)
peerstore := peer.NewPeerstore()
swarm, err := NewSwarm(ctx, local, peerstore)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment