conn.go 4.28 KB
Newer Older
1
package conn
2 3

import (
4
	"fmt"
5
	"time"
6

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8
	msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
9
	mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
11
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12

13 14
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
var log = u.Logger("conn")

20 21 22
const (
	// ChanBuffer is the size of the buffer in the Conn Chan
	ChanBuffer = 10
23

24
	// MaxMessageSize is the size of the largest single message
Jeromy's avatar
Jeromy committed
25
	MaxMessageSize = 1 << 20
26 27 28 29

	// HandshakeTimeout for when nodes first connect
	HandshakeTimeout = time.Second * 5
)
30

Jeromy's avatar
Jeromy committed
31 32
// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
Jeromy's avatar
Jeromy committed
33
func ReleaseBuffer(b []byte) {
34
	log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
35
	mpool.ByteSlicePool.Put(uint32(cap(b)), b)
Jeromy's avatar
Jeromy committed
36 37
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39 40 41 42
// msgioPipe is a pipe using msgio channels.
type msgioPipe struct {
	outgoing *msgio.Chan
	incoming *msgio.Chan
}
43

44
func newMsgioPipe(size int) *msgioPipe {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
	return &msgioPipe{
Jeromy's avatar
Jeromy committed
46
		outgoing: msgio.NewChan(size),
47
		incoming: msgio.NewChan(size),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49 50 51 52
	}
}

// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
53 54
	local  peer.Peer
	remote peer.Peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
	maconn manet.Conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
	msgio  *msgioPipe
57

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58
	ctxc.ContextCloser
59 60
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
// newConn constructs a new connection
62
func newSingleConn(ctx context.Context, local, remote peer.Peer,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
	maconn manet.Conn) (Conn, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65

	conn := &singleConn{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66 67 68
		local:  local,
		remote: remote,
		maconn: maconn,
69
		msgio:  newMsgioPipe(10),
70 71
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
73

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75 76
	log.Info("newSingleConn: %v to %v", local, remote)

	// setup the various io goroutines
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	conn.Children().Add(1)
78 79 80 81
	go func() {
		conn.msgio.outgoing.WriteTo(maconn)
		conn.Children().Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
	conn.Children().Add(1)
83
	go func() {
84
		conn.msgio.incoming.ReadFromWithPool(maconn, &mpool.ByteSlicePool)
85 86
		conn.Children().Done()
	}()
87

88 89
	// version handshake
	ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90
	if err := Handshake1(ctxT, conn); err != nil {
91
		conn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92
		return nil, fmt.Errorf("Handshake1 failed: %s", err)
93 94
	}

95 96 97
	return conn, nil
}

98 99
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
100
	log.Debugf("%s closing Conn with %s", c.local, c.remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102

	// close underlying connection
103
	err := c.maconn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104
	c.msgio.outgoing.Close()
105
	return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107
}

Jeromy's avatar
Jeromy committed
108 109 110 111 112 113 114 115 116 117 118
func (c *singleConn) GetError() error {
	select {
	case err := <-c.msgio.incoming.ErrChan:
		return err
	case err := <-c.msgio.outgoing.ErrChan:
		return err
	default:
		return nil
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120 121 122 123
// ID is an identifier unique to this connection.
func (c *singleConn) ID() string {
	return ID(c)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124 125 126 127
func (c *singleConn) String() string {
	return String(c, "singleConn")
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128 129 130 131 132 133 134 135 136 137
// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
	return c.maconn.LocalMultiaddr()
}

// RemoteMultiaddr is the Multiaddr on the remote side
func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
	return c.maconn.RemoteMultiaddr()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138
// LocalPeer is the Peer on this side
139
func (c *singleConn) LocalPeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140 141 142 143
	return c.local
}

// RemotePeer is the Peer on the remote side
144
func (c *singleConn) RemotePeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145 146 147
	return c.remote
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148 149
// In returns a readable message channel
func (c *singleConn) In() <-chan []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
	return c.msgio.incoming.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151 152
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154
// Out returns a writable message channel
func (c *singleConn) Out() chan<- []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
155
	return c.msgio.outgoing.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
// ID returns the ID of a given Conn.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
func ID(c Conn) string {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161
	l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID())
	r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163 164 165 166 167
	lh := u.Hash([]byte(l))
	rh := u.Hash([]byte(r))
	ch := u.XOR(lh, rh)
	return u.Key(ch).Pretty()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168 169 170 171
// String returns the user-friendly String representation of a conn
func String(c Conn, typ string) string {
	return fmt.Sprintf("%s (%s) <-- %s --> (%s) %s",
		c.LocalPeer(), c.LocalMultiaddr(), typ, c.RemoteMultiaddr(), c.RemotePeer())
172
}