conn.go 4.38 KB
Newer Older
1
package conn
2 3

import (
4
	"fmt"
Jeromy's avatar
Jeromy committed
5
	"sync"
6
	"time"
7

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
11
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12

13 14
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
var log = u.Logger("conn")

20 21 22
const (
	// ChanBuffer is the size of the buffer in the Conn Chan
	ChanBuffer = 10
23

24
	// MaxMessageSize is the size of the largest single message
25
	MaxMessageSize = 1 << 21 // 4 MB
26 27 28 29

	// HandshakeTimeout for when nodes first connect
	HandshakeTimeout = time.Second * 5
)
30

Jeromy's avatar
Jeromy committed
31 32 33 34 35 36 37 38 39 40 41
var BufferPool *sync.Pool

func init() {
	BufferPool = new(sync.Pool)
	BufferPool.New = func() interface{} {
		log.Warning("Pool returning new object")
		return make([]byte, MaxMessageSize)
	}
}

func ReleaseBuffer(b []byte) {
42 43 44 45 46
	log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
	if cap(b) != MaxMessageSize {
		log.Warning("Release buffer failed.")
		return
	}
Jeromy's avatar
Jeromy committed
47 48 49
	BufferPool.Put(b[:cap(b)])
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51 52 53 54
// msgioPipe is a pipe using msgio channels.
type msgioPipe struct {
	outgoing *msgio.Chan
	incoming *msgio.Chan
}
55

Jeromy's avatar
Jeromy committed
56
func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	return &msgioPipe{
Jeromy's avatar
Jeromy committed
58 59
		outgoing: msgio.NewChan(size),
		incoming: msgio.NewChanWithPool(size, pool),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62 63 64
	}
}

// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
65 66
	local  peer.Peer
	remote peer.Peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	maconn manet.Conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
	msgio  *msgioPipe
69

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	ctxc.ContextCloser
71 72
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
// newConn constructs a new connection
74
func newSingleConn(ctx context.Context, local, remote peer.Peer,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75
	maconn manet.Conn) (Conn, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76 77

	conn := &singleConn{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79 80
		local:  local,
		remote: remote,
		maconn: maconn,
Jeromy's avatar
Jeromy committed
81
		msgio:  newMsgioPipe(10, BufferPool),
82 83
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
	conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
85

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87 88
	log.Info("newSingleConn: %v to %v", local, remote)

	// setup the various io goroutines
89 90 91 92 93 94 95 96 97 98
	go func() {
		conn.Children().Add(1)
		conn.msgio.outgoing.WriteTo(maconn)
		conn.Children().Done()
	}()
	go func() {
		conn.Children().Add(1)
		conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
		conn.Children().Done()
	}()
99

100 101
	// version handshake
	ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	if err := Handshake1(ctxT, conn); err != nil {
103
		conn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104
		return nil, fmt.Errorf("Handshake1 failed: %s", err)
105 106
	}

107 108 109
	return conn, nil
}

110 111
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
112
	log.Debugf("%s closing Conn with %s", c.local, c.remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114

	// close underlying connection
115
	err := c.maconn.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
	c.msgio.outgoing.Close()
117
	return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
}

Jeromy's avatar
Jeromy committed
120 121 122 123 124 125 126 127 128 129 130
func (c *singleConn) GetError() error {
	select {
	case err := <-c.msgio.incoming.ErrChan:
		return err
	case err := <-c.msgio.outgoing.ErrChan:
		return err
	default:
		return nil
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133 134 135
// ID is an identifier unique to this connection.
func (c *singleConn) ID() string {
	return ID(c)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137 138 139
func (c *singleConn) String() string {
	return String(c, "singleConn")
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140 141 142 143 144 145 146 147 148 149
// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
	return c.maconn.LocalMultiaddr()
}

// RemoteMultiaddr is the Multiaddr on the remote side
func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
	return c.maconn.RemoteMultiaddr()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
// LocalPeer is the Peer on this side
151
func (c *singleConn) LocalPeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152 153 154 155
	return c.local
}

// RemotePeer is the Peer on the remote side
156
func (c *singleConn) RemotePeer() peer.Peer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158 159
	return c.remote
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161
// In returns a readable message channel
func (c *singleConn) In() <-chan []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
	return c.msgio.incoming.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163 164
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166
// Out returns a writable message channel
func (c *singleConn) Out() chan<- []byte {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
	return c.msgio.outgoing.MsgChan
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168 169
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
// ID returns the ID of a given Conn.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171
func ID(c Conn) string {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172 173
	l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID())
	r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175 176 177 178 179
	lh := u.Hash([]byte(l))
	rh := u.Hash([]byte(r))
	ch := u.XOR(lh, rh)
	return u.Key(ch).Pretty()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180 181 182 183
// String returns the user-friendly String representation of a conn
func String(c Conn, typ string) string {
	return fmt.Sprintf("%s (%s) <-- %s --> (%s) %s",
		c.LocalPeer(), c.LocalMultiaddr(), typ, c.RemoteMultiaddr(), c.RemotePeer())
184
}