conn.go 4.58 KB
Newer Older
1
package conn
2 3

import (
4
	"fmt"
Jeromy's avatar
Jeromy committed
5
	"sync"
6
	"time"
7

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
11
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12

13 14
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
var log = u.Logger("conn")

20 21 22
const (
	// ChanBuffer is the size of the buffer in the Conn Chan
	ChanBuffer = 10
23

24
	// MaxMessageSize is the size of the largest single message
Jeromy's avatar
Jeromy committed
25
	MaxMessageSize = 1 << 20
26 27 28 29

	// HandshakeTimeout for when nodes first connect
	HandshakeTimeout = time.Second * 5
)
30

Jeromy's avatar
Jeromy committed
31
// global static buffer pool for byte arrays of size MaxMessageSize
Jeromy's avatar
Jeromy committed
32 33 34 35 36 37 38 39 40 41
var BufferPool *sync.Pool

func init() {
	BufferPool = new(sync.Pool)
	BufferPool.New = func() interface{} {
		log.Warning("Pool returning new object")
		return make([]byte, MaxMessageSize)
	}
}

Jeromy's avatar
Jeromy committed
42 43
// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
Jeromy's avatar
Jeromy committed
44
func ReleaseBuffer(b []byte) {
45 46
	log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
	if cap(b) != MaxMessageSize {
Jeromy's avatar
Jeromy committed
47
		log.Warning("Release buffer failed (cap, size = %d, %d)", cap(b), len(b))
48 49
		return
	}
Jeromy's avatar
Jeromy committed
50 51 52
	BufferPool.Put(b[:cap(b)])
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53 54 55 56 57
// msgioPipe is a pipe using msgio channels.
type msgioPipe struct {
	outgoing *msgio.Chan
	incoming *msgio.Chan
}
58

Jeromy's avatar
Jeromy committed
59
func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60
	return &msgioPipe{
Jeromy's avatar
Jeromy committed
61 62
		outgoing: msgio.NewChan(size),
		incoming: msgio.NewChanWithPool(size, pool),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63 64 65 66 67
	}
}

// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
68 69
	local  peer.Peer
	remote peer.Peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	maconn manet.Conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	msgio  *msgioPipe
72

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	ctxc.ContextCloser
74 75
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
// newConn constructs a new connection
77
func newSingleConn(ctx context.Context, local, remote peer.Peer,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78
	maconn manet.Conn) (Conn, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80

	conn := &singleConn{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81 82 83
		local:  local,
		remote: remote,
		maconn: maconn,
Jeromy's avatar
Jeromy committed
84
		msgio:  newMsgioPipe(10, BufferPool),
85 86
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
	conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
88

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89 90 91
	log.Info("newSingleConn: %v to %v", local, remote)

	// setup the various io goroutines
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92
	conn.Children().Add(1)
93 94 95 96
	go func() {
		conn.msgio.outgoing.WriteTo(maconn)
		conn.Children().Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97
	conn.Children().Add(1)
98 99 100 101
	go func() {
		conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
		conn.Children().Done()
	}()
102

103 104
	// version handshake
	ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
	if err := Handshake1(ctxT, conn); err != nil {
106
		conn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
		return nil, fmt.Errorf("Handshake1 failed: %s", err)
108 109
	}

110 111 112
	return conn, nil
}

113 114
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
115
	log.Debugf("%s closing Conn with %s", c.local, c.remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116 117

	// close underlying connection
118
	err := c.maconn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119
	c.msgio.outgoing.Close()
120
	return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122
}

Jeromy's avatar
Jeromy committed
123 124 125 126 127 128 129 130 131 132 133
func (c *singleConn) GetError() error {
	select {
	case err := <-c.msgio.incoming.ErrChan:
		return err
	case err := <-c.msgio.outgoing.ErrChan:
		return err
	default:
		return nil
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135 136 137 138
// ID is an identifier unique to this connection.
func (c *singleConn) ID() string {
	return ID(c)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139 140 141 142
func (c *singleConn) String() string {
	return String(c, "singleConn")
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143 144 145 146 147 148 149 150 151 152
// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
	return c.maconn.LocalMultiaddr()
}

// RemoteMultiaddr is the Multiaddr on the remote side
func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
	return c.maconn.RemoteMultiaddr()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
// LocalPeer is the Peer on this side
154
func (c *singleConn) LocalPeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
155 156 157 158
	return c.local
}

// RemotePeer is the Peer on the remote side
159
func (c *singleConn) RemotePeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161 162
	return c.remote
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163 164
// In returns a readable message channel
func (c *singleConn) In() <-chan []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	return c.msgio.incoming.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168 169
// Out returns a writable message channel
func (c *singleConn) Out() chan<- []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
	return c.msgio.outgoing.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
// ID returns the ID of a given Conn.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
func ID(c Conn) string {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175 176
	l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID())
	r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177 178 179 180 181 182
	lh := u.Hash([]byte(l))
	rh := u.Hash([]byte(r))
	ch := u.XOR(lh, rh)
	return u.Key(ch).Pretty()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183 184 185 186
// String returns the user-friendly String representation of a conn
func String(c Conn, typ string) string {
	return fmt.Sprintf("%s (%s) <-- %s --> (%s) %s",
		c.LocalPeer(), c.LocalMultiaddr(), typ, c.RemoteMultiaddr(), c.RemotePeer())
187
}