conn.go 5.76 KB
Newer Older
1 2 3 4 5 6 7
package swarm

import (
	"errors"
	"fmt"

	conn "github.com/jbenet/go-ipfs/net/conn"
Henry's avatar
Henry committed
8
	handshake "github.com/jbenet/go-ipfs/net/handshake"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10
	msg "github.com/jbenet/go-ipfs/net/message"

11
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
12
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
13 14
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16 17 18 19 20 21 22 23 24 25 26 27
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen() error {
	hasErr := false
	retErr := &ListenErr{
		Errors: make([]error, len(s.local.Addresses)),
	}

	// listen on every address
	for i, addr := range s.local.Addresses {
		err := s.connListen(addr)
		if err != nil {
			hasErr = true
			retErr.Errors[i] = err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
			log.Error("Failed to listen on: %s - %s", addr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30 31 32 33 34 35 36 37 38
		}
	}

	if hasErr {
		return retErr
	}
	return nil
}

// Listen for new connections on the given multiaddr
39
func (s *Swarm) connListen(maddr ma.Multiaddr) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41

	list, err := conn.Listen(s.ctx, maddr, s.local, s.peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44 45
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46 47 48 49 50
	// make sure port can be reused. TOOD this doesn't work...
	// if err := setSocketReuse(list); err != nil {
	// 	return err
	// }

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51 52 53 54 55 56
	// NOTE: this may require a lock around it later. currently, only run on setup
	s.listeners = append(s.listeners, list)

	// Accept and handle new connections on this listener until it errors
	go func() {
		for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57 58 59
			select {
			case <-s.ctx.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62
			case conn := <-list.Accept():
				go s.handleIncomingConn(conn)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63 64 65 66 67 68 69
			}
		}
	}()

	return nil
}

70
// Handle getting ID from this peer, handshake, and adding it into the map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
func (s *Swarm) handleIncomingConn(nconn conn.Conn) {
72 73

	// Setup the new connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
	err := s.connSetup(nconn)
75 76
	if err != nil && err != ErrAlreadyOpen {
		s.errChan <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
		nconn.Close()
78 79 80 81 82
	}
}

// connSetup adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83
func (s *Swarm) connSetup(c conn.Conn) error {
84 85 86 87
	if c == nil {
		return errors.New("Tried to start nil connection.")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	log.Debug("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90
	// add address of connection to Peer. Maybe it should happen in connSecure.
91 92 93 94
	// NOT adding this address here, because the incoming address in TCP
	// is an EPHEMERAL address, and not the address we want to keep around.
	// addresses should be figured out through the DHT.
	// c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95

96 97 98 99
	if err := s.connVersionExchange(c); err != nil {
		return fmt.Errorf("Conn version exchange error: %v", err)
	}

100 101
	// add to conns
	s.connsLock.Lock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	if _, ok := s.conns[c.RemotePeer().Key()]; ok {
Jeromy's avatar
Jeromy committed
103
		log.Debug("Conn already open!")
104 105 106
		s.connsLock.Unlock()
		return ErrAlreadyOpen
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
	s.conns[c.RemotePeer().Key()] = c
Jeromy's avatar
Jeromy committed
108
	log.Debug("Added conn to map!")
109 110 111 112 113 114 115
	s.connsLock.Unlock()

	// kick off reader goroutine
	go s.fanIn(c)
	return nil
}

116 117
// connVersionExchange exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
func (s *Swarm) connVersionExchange(r conn.Conn) error {
	rpeer := r.RemotePeer()
120

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123 124
	var remoteH, localH *handshake.Handshake1
	localH = handshake.CurrentHandshake()

	myVerBytes, err := proto.Marshal(localH)
125
	if err != nil {
Henry's avatar
Henry committed
126
		return err
127 128
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130
	r.MsgOut() <- msg.New(rpeer, myVerBytes)
	log.Debug("Sent my version(%s) [to = %s]", localH, rpeer)
131

Henry's avatar
Henry committed
132 133 134
	select {
	case <-s.ctx.Done():
		return s.ctx.Err()
135

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137
	// case <-remote.Done():
	// 	return errors.New("remote closed connection during version exchange")
138

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139
	case data, ok := <-r.MsgIn():
Henry's avatar
Henry committed
140
		if !ok {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
			return fmt.Errorf("Error retrieving from conn: %v", rpeer)
Henry's avatar
Henry committed
142
		}
Henry's avatar
Henry committed
143

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144 145
		remoteH = new(handshake.Handshake1)
		err = proto.Unmarshal(data.Data(), remoteH)
Henry's avatar
Henry committed
146 147 148
		if err != nil {
			s.Close()
			return fmt.Errorf("connSetup: could not decode remote version: %q", err)
149
		}
Henry's avatar
Henry committed
150

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
		log.Debug("Received remote version(%s) [from = %s]", remoteH, rpeer)
152 153
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156
	if err := handshake.Compatible(localH, remoteH); err != nil {
		log.Info("%s (%s) incompatible version with %s (%s)", s.local, localH, rpeer, remoteH)
		r.Close()
Henry's avatar
Henry committed
157
		return err
Henry's avatar
Henry committed
158 159
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	log.Debug("[peer: %s] Version compatible", rpeer)
Henry's avatar
Henry committed
161
	return nil
162 163
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164 165 166 167 168 169 170 171 172 173 174 175 176
// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
	for {
		select {
		case <-s.ctx.Done():
			return // told to close.

		case msg, ok := <-s.Outgoing:
			if !ok {
				return
			}

			s.connsLock.RLock()
177
			conn, found := s.conns[msg.Peer().Key()]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179 180 181 182 183 184 185 186
			s.connsLock.RUnlock()

			if !found {
				e := fmt.Errorf("Sent msg to peer without open conn: %v",
					msg.Peer)
				s.errChan <- e
				continue
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
			// log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
			// queue it in the connection's buffer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
			conn.MsgOut() <- msg
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191 192 193 194 195 196
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197
func (s *Swarm) fanIn(c conn.Conn) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198 199 200 201 202 203 204
	for {
		select {
		case <-s.ctx.Done():
			// close Conn.
			c.Close()
			goto out

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205
		case data, ok := <-c.MsgIn():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
			if !ok {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
				e := fmt.Errorf("Error retrieving from conn: %v", c.RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208 209 210 211
				s.errChan <- e
				goto out
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
			// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
			s.Incoming <- data
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214 215 216 217 218
		}
	}

out:
	s.connsLock.Lock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
	delete(s.conns, c.RemotePeer().Key())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221
	s.connsLock.Unlock()
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222

223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
// Commenting out because it's platform specific
// func setSocketReuse(l manet.Listener) error {
// 	nl := l.NetListener()
//
// 	// for now only TCP. TODO change this when more networks.
// 	file, err := nl.(*net.TCPListener).File()
// 	if err != nil {
// 		return err
// 	}
//
// 	fd := file.Fd()
// 	err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
// 	if err != nil {
// 		return err
// 	}
//
// 	err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
// 	if err != nil {
// 		return err
// 	}
//
// 	return nil
// }