conn.go 4.28 KB
Newer Older
1
package conn
2 3

import (
4
	"fmt"
Jeromy's avatar
Jeromy committed
5
	"sync"
6
	"time"
7

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
11
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12

13 14
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
var log = u.Logger("conn")

20 21 22
const (
	// ChanBuffer is the size of the buffer in the Conn Chan
	ChanBuffer = 10
23

24
	// MaxMessageSize is the size of the largest single message
Jeromy's avatar
Jeromy committed
25
	MaxMessageSize = 1 << 22 // 4 MB
26 27 28 29

	// HandshakeTimeout for when nodes first connect
	HandshakeTimeout = time.Second * 5
)
30

Jeromy's avatar
Jeromy committed
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
var BufferPool *sync.Pool

func init() {
	BufferPool = new(sync.Pool)
	BufferPool.New = func() interface{} {
		log.Warning("Pool returning new object")
		return make([]byte, MaxMessageSize)
	}
}

func ReleaseBuffer(b []byte) {
	log.Warningf("Releasing buffer! (size = %d)", cap(b))
	BufferPool.Put(b[:cap(b)])
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46 47 48 49 50
// msgioPipe is a pipe using msgio channels.
type msgioPipe struct {
	outgoing *msgio.Chan
	incoming *msgio.Chan
}
51

Jeromy's avatar
Jeromy committed
52
func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
	return &msgioPipe{
Jeromy's avatar
Jeromy committed
54 55
		outgoing: msgio.NewChan(size),
		incoming: msgio.NewChanWithPool(size, pool),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56 57 58 59 60
	}
}

// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
61 62
	local  peer.Peer
	remote peer.Peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
	maconn manet.Conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
	msgio  *msgioPipe
65

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
	ctxc.ContextCloser
67 68
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
// newConn constructs a new connection
70
func newSingleConn(ctx context.Context, local, remote peer.Peer,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	maconn manet.Conn) (Conn, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73

	conn := &singleConn{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75 76
		local:  local,
		remote: remote,
		maconn: maconn,
Jeromy's avatar
Jeromy committed
77
		msgio:  newMsgioPipe(10, BufferPool),
78 79
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
	conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
81

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83 84
	log.Info("newSingleConn: %v to %v", local, remote)

	// setup the various io goroutines
85 86 87 88 89 90 91 92 93 94
	go func() {
		conn.Children().Add(1)
		conn.msgio.outgoing.WriteTo(maconn)
		conn.Children().Done()
	}()
	go func() {
		conn.Children().Add(1)
		conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
		conn.Children().Done()
	}()
95

96 97
	// version handshake
	ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
	if err := Handshake1(ctxT, conn); err != nil {
99
		conn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
		return nil, fmt.Errorf("Handshake1 failed: %s", err)
101 102
	}

103 104 105
	return conn, nil
}

106 107
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
108
	log.Debugf("%s closing Conn with %s", c.local, c.remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109 110

	// close underlying connection
111
	err := c.maconn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
	c.msgio.outgoing.Close()
113
	return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115
}

Jeromy's avatar
Jeromy committed
116 117 118 119 120 121 122 123 124 125 126
func (c *singleConn) GetError() error {
	select {
	case err := <-c.msgio.incoming.ErrChan:
		return err
	case err := <-c.msgio.outgoing.ErrChan:
		return err
	default:
		return nil
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128 129 130 131
// ID is an identifier unique to this connection.
func (c *singleConn) ID() string {
	return ID(c)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134 135
func (c *singleConn) String() string {
	return String(c, "singleConn")
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137 138 139 140 141 142 143 144 145
// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
	return c.maconn.LocalMultiaddr()
}

// RemoteMultiaddr is the Multiaddr on the remote side
func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
	return c.maconn.RemoteMultiaddr()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
// LocalPeer is the Peer on this side
147
func (c *singleConn) LocalPeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148 149 150 151
	return c.local
}

// RemotePeer is the Peer on the remote side
152
func (c *singleConn) RemotePeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154 155
	return c.remote
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157
// In returns a readable message channel
func (c *singleConn) In() <-chan []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
	return c.msgio.incoming.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159 160
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162
// Out returns a writable message channel
func (c *singleConn) Out() chan<- []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	return c.msgio.outgoing.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164 165
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
// ID returns the ID of a given Conn.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
func ID(c Conn) string {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168 169
	l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID())
	r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171 172 173 174 175
	lh := u.Hash([]byte(l))
	rh := u.Hash([]byte(r))
	ch := u.XOR(lh, rh)
	return u.Key(ch).Pretty()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176 177 178 179
// String returns the user-friendly String representation of a conn
func String(c Conn, typ string) string {
	return fmt.Sprintf("%s (%s) <-- %s --> (%s) %s",
		c.LocalPeer(), c.LocalMultiaddr(), typ, c.RemoteMultiaddr(), c.RemotePeer())
180
}