conn.go 7.12 KB
Newer Older
1 2 3 4 5 6
package swarm

import (
	"errors"
	"fmt"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	spipe "github.com/jbenet/go-ipfs/crypto/spipe"
8
	conn "github.com/jbenet/go-ipfs/net/conn"
Henry's avatar
Henry committed
9
	handshake "github.com/jbenet/go-ipfs/net/handshake"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
	msg "github.com/jbenet/go-ipfs/net/message"

12
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
13
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
14
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
15 16
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23 24 25 26 27 28 29
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen() error {
	hasErr := false
	retErr := &ListenErr{
		Errors: make([]error, len(s.local.Addresses)),
	}

	// listen on every address
	for i, addr := range s.local.Addresses {
		err := s.connListen(addr)
		if err != nil {
			hasErr = true
			retErr.Errors[i] = err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
			log.Error("Failed to listen on: %s - %s", addr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32 33 34 35 36 37 38 39 40
		}
	}

	if hasErr {
		return retErr
	}
	return nil
}

// Listen for new connections on the given multiaddr
41
func (s *Swarm) connListen(maddr ma.Multiaddr) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42
	list, err := manet.Listen(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44 45 46
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49 50 51
	// make sure port can be reused. TOOD this doesn't work...
	// if err := setSocketReuse(list); err != nil {
	// 	return err
	// }

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53 54 55 56 57 58 59
	// NOTE: this may require a lock around it later. currently, only run on setup
	s.listeners = append(s.listeners, list)

	// Accept and handle new connections on this listener until it errors
	go func() {
		for {
			nconn, err := list.Accept()
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60
				e := fmt.Errorf("Failed to accept connection: %s - %s", maddr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
				s.errChan <- e

				// if cancel is nil, we're closed.
				if s.cancel == nil {
					return
				}
			} else {
				go s.handleIncomingConn(nconn)
			}
		}
	}()

	return nil
}

76
// Handle getting ID from this peer, handshake, and adding it into the map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
func (s *Swarm) handleIncomingConn(nconn manet.Conn) {
78

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80
	// Construct conn with nil peer for now, because we don't know its ID yet.
	// connSetup will figure this out, and pull out / construct the peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
	c, err := conn.NewConn(s.local, nil, nconn)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83 84 85
	if err != nil {
		s.errChan <- err
		return
	}
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101

	// Setup the new connection
	err = s.connSetup(c)
	if err != nil && err != ErrAlreadyOpen {
		s.errChan <- err
		c.Close()
	}
}

// connSetup adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func (s *Swarm) connSetup(c *conn.Conn) error {
	if c == nil {
		return errors.New("Tried to start nil connection.")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103
	if c.Remote != nil {
		log.Debug("%s Starting connection: %s", c.Local, c.Remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104
	} else {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
		log.Debug("%s Starting connection: [unknown peer]", c.Local)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106
	}
107

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109 110
	if err := s.connSecure(c); err != nil {
		return fmt.Errorf("Conn securing error: %v", err)
	}
111

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
	log.Debug("%s secured connection: %s", c.Local, c.Remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114
	// add address of connection to Peer. Maybe it should happen in connSecure.
115 116 117 118
	// NOT adding this address here, because the incoming address in TCP
	// is an EPHEMERAL address, and not the address we want to keep around.
	// addresses should be figured out through the DHT.
	// c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119

120 121 122 123
	if err := s.connVersionExchange(c); err != nil {
		return fmt.Errorf("Conn version exchange error: %v", err)
	}

124 125
	// add to conns
	s.connsLock.Lock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126
	if _, ok := s.conns[c.Remote.Key()]; ok {
Jeromy's avatar
Jeromy committed
127
		log.Debug("Conn already open!")
128 129 130
		s.connsLock.Unlock()
		return ErrAlreadyOpen
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131
	s.conns[c.Remote.Key()] = c
Jeromy's avatar
Jeromy committed
132
	log.Debug("Added conn to map!")
133 134 135 136 137 138 139
	s.connsLock.Unlock()

	// kick off reader goroutine
	go s.fanIn(c)
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140 141 142
// connSecure setups a secure remote connection.
func (s *Swarm) connSecure(c *conn.Conn) error {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143
	sp, err := spipe.NewSecurePipe(s.ctx, 10, s.local, s.peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144 145 146 147 148 149 150 151 152 153 154
	if err != nil {
		return err
	}

	err = sp.Wrap(s.ctx, spipe.Duplex{
		In:  c.Incoming.MsgChan,
		Out: c.Outgoing.MsgChan,
	})
	if err != nil {
		return err
	}
155

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157
	if c.Remote == nil {
		c.Remote = sp.RemotePeer()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	} else if c.Remote != sp.RemotePeer() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161 162
		panic("peers not being constructed correctly.")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	c.Secure = sp
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164
	return nil
165
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166

167 168 169
// connVersionExchange exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
func (s *Swarm) connVersionExchange(remote *conn.Conn) error {
Henry's avatar
Henry committed
170 171
	var remoteHandshake, localHandshake *handshake.Handshake1
	localHandshake = handshake.CurrentHandshake()
172

Henry's avatar
Henry committed
173
	myVerBytes, err := proto.Marshal(localHandshake)
174
	if err != nil {
Henry's avatar
Henry committed
175
		return err
176 177
	}

Henry's avatar
Henry committed
178
	remote.Secure.Out <- myVerBytes
Henry's avatar
Henry committed
179

Henry's avatar
Henry committed
180
	log.Debug("Send my version(%s) [to = %s]", localHandshake, remote.Peer)
181

Henry's avatar
Henry committed
182 183 184
	select {
	case <-s.ctx.Done():
		return s.ctx.Err()
185

Henry's avatar
Henry committed
186 187
	case <-remote.Closed:
		return errors.New("remote closed connection during version exchange")
188

Henry's avatar
Henry committed
189 190 191 192
	case data, ok := <-remote.Secure.In:
		if !ok {
			return fmt.Errorf("Error retrieving from conn: %v", remote.Peer)
		}
Henry's avatar
Henry committed
193

Henry's avatar
Henry committed
194 195 196 197 198
		remoteHandshake = new(handshake.Handshake1)
		err = proto.Unmarshal(data, remoteHandshake)
		if err != nil {
			s.Close()
			return fmt.Errorf("connSetup: could not decode remote version: %q", err)
199
		}
Henry's avatar
Henry committed
200 201

		log.Debug("Received remote version(%s) [from = %s]", remoteHandshake, remote.Peer)
202 203
	}

Henry's avatar
Henry committed
204 205
	if err := handshake.Compatible(localHandshake, remoteHandshake); err != nil {
		log.Info("%s (%s) incompatible version with %s (%s)", s.local, localHandshake, remote.Peer, remoteHandshake)
Henry's avatar
Henry committed
206
		remote.Close()
Henry's avatar
Henry committed
207
		return err
Henry's avatar
Henry committed
208 209
	}

210
	log.Debug("[peer: %s] Version compatible", remote.Peer)
Henry's avatar
Henry committed
211
	return nil
212 213
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214 215 216 217 218 219 220 221 222 223 224 225 226
// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
	for {
		select {
		case <-s.ctx.Done():
			return // told to close.

		case msg, ok := <-s.Outgoing:
			if !ok {
				return
			}

			s.connsLock.RLock()
227
			conn, found := s.conns[msg.Peer().Key()]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228 229 230 231 232 233 234 235 236
			s.connsLock.RUnlock()

			if !found {
				e := fmt.Errorf("Sent msg to peer without open conn: %v",
					msg.Peer)
				s.errChan <- e
				continue
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237
			// log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
239
			// queue it in the connection's buffer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240
			conn.Secure.Out <- msg.Data()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(c *conn.Conn) {
	for {
		select {
		case <-s.ctx.Done():
			// close Conn.
			c.Close()
			goto out

		case <-c.Closed:
			goto out

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258
		case data, ok := <-c.Secure.In:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
259
			if !ok {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260
				e := fmt.Errorf("Error retrieving from conn: %v", c.Remote)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261 262 263 264
				s.errChan <- e
				goto out
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
265
			// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267
			msg := msg.New(c.Remote, data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
268 269 270 271 272 273
			s.Incoming <- msg
		}
	}

out:
	s.connsLock.Lock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
274
	delete(s.conns, c.Remote.Key())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275 276
	s.connsLock.Unlock()
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
// Commenting out because it's platform specific
// func setSocketReuse(l manet.Listener) error {
// 	nl := l.NetListener()
//
// 	// for now only TCP. TODO change this when more networks.
// 	file, err := nl.(*net.TCPListener).File()
// 	if err != nil {
// 		return err
// 	}
//
// 	fd := file.Fd()
// 	err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
// 	if err != nil {
// 		return err
// 	}
//
// 	err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
// 	if err != nil {
// 		return err
// 	}
//
// 	return nil
// }