conn.go 5.95 KB
Newer Older
1 2 3 4 5 6 7
package swarm

import (
	"errors"
	"fmt"

	conn "github.com/jbenet/go-ipfs/net/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8 9
	msg "github.com/jbenet/go-ipfs/net/message"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
11
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
12 13
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
// Open listeners for each network the swarm should listen on
15
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
	hasErr := false
	retErr := &ListenErr{
18
		Errors: make([]error, len(addrs)),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21
	}

	// listen on every address
22
	for i, addr := range addrs {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26
		err := s.connListen(addr)
		if err != nil {
			hasErr = true
			retErr.Errors[i] = err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
			log.Errorf("Failed to listen on: %s - %s", addr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28 29 30 31 32 33 34 35 36 37
		}
	}

	if hasErr {
		return retErr
	}
	return nil
}

// Listen for new connections on the given multiaddr
38
func (s *Swarm) connListen(maddr ma.Multiaddr) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39

40 41 42 43 44
	resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
	list, err := conn.Listen(s.Context(), maddr, s.local, s.peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46 47 48 49
	if err != nil {
		return err
	}

50 51 52 53 54
	// add resolved local addresses to peer
	for _, addr := range resolved {
		s.local.AddAddress(addr)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56 57 58 59
	// make sure port can be reused. TOOD this doesn't work...
	// if err := setSocketReuse(list); err != nil {
	// 	return err
	// }

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62 63
	// NOTE: this may require a lock around it later. currently, only run on setup
	s.listeners = append(s.listeners, list)

	// Accept and handle new connections on this listener until it errors
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	// this listener is a child.
	s.Children().Add(1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
	go func() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68
		defer s.Children().Done()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
		for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
			select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
			case <-s.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
			case conn := <-list.Accept():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75 76
				// handler also a child.
				s.Children().Add(1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
				go s.handleIncomingConn(conn)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79 80 81 82 83 84
			}
		}
	}()

	return nil
}

85
// Handle getting ID from this peer, handshake, and adding it into the map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
func (s *Swarm) handleIncomingConn(nconn conn.Conn) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88
	// this handler is a child. added by caller.
	defer s.Children().Done()
89 90

	// Setup the new connection
91
	_, err := s.connSetup(nconn)
92 93
	if err != nil && err != ErrAlreadyOpen {
		s.errChan <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94
		nconn.Close()
95 96 97 98
	}
}

// connSetup adds the passed in connection to its peerMap and starts
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99
// the fanInSingle routine for that connection
100
func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
101
	if c == nil {
102
		return nil, errors.New("Tried to start nil connection.")
103 104
	}

105
	log.Debugf("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
	// add address of connection to Peer. Maybe it should happen in connSecure.
108 109 110 111
	// NOT adding this address here, because the incoming address in TCP
	// is an EPHEMERAL address, and not the address we want to keep around.
	// addresses should be figured out through the DHT.
	// c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114
	// handshake3
	ctxT, _ := context.WithTimeout(c.Context(), conn.HandshakeTimeout)
115 116
	h3result, err := conn.Handshake3(ctxT, c)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119 120
		c.Close()
		return nil, fmt.Errorf("Handshake3 failed: %s", err)
	}

121 122 123
	// check for nats. you know, just in case.
	s.checkNATWarning(h3result.LocalObservedAddress)

124 125
	// add to conns
	s.connsLock.Lock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128
	mc, found := s.conns[c.RemotePeer().Key()]
	if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131 132
		// multiconn doesn't exist, make a new one.
		conns := []conn.Conn{c}
		mc, err := conn.NewMultiConn(s.Context(), s.local, c.RemotePeer(), conns)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133
			log.Errorf("error creating multiconn: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135 136 137 138
			c.Close()
			return nil, err
		}

		s.conns[c.RemotePeer().Key()] = mc
139
		s.connsLock.Unlock()
140

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
		// kick off reader goroutine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142 143
		s.Children().Add(1)
		mc.Children().Add(1) // child of Conn as well.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
		go s.fanInSingle(mc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
		log.Debugf("added new multiconn: %s", mc)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146 147 148 149
	} else {
		s.connsLock.Unlock() // unlock before adding new conn

		mc.Add(c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
		log.Debugf("multiconn found: %s", mc)
151
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
	log.Debugf("multiconn added new conn %s", c)
154
	return c, nil
155 156
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158
// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159 160
	defer s.Children().Done()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
	i := 0
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164
		case <-s.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166 167 168
			return // told to close.

		case msg, ok := <-s.Outgoing:
			if !ok {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169
				log.Infof("%s outgoing channel closed", s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171
				return
			}
Jeromy's avatar
Jeromy committed
172 173 174
			if len(msg.Data()) >= conn.MaxMessageSize {
				log.Critical("Attempted to send message bigger than max size.")
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175 176

			s.connsLock.RLock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
			c, found := s.conns[msg.Peer().Key()]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179 180
			s.connsLock.RUnlock()

			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
				e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182
				s.errChan <- e
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183
				log.Error(e)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184 185 186
				continue
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
			i++
Jeromy's avatar
Jeromy committed
188
			log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
			// queue it in the connection's buffer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
			c.Out() <- msg.Data()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191 192 193 194 195 196
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197 198 199 200 201 202 203 204 205 206 207 208
func (s *Swarm) fanInSingle(c conn.Conn) {
	// cleanup all data associated with this child Connection.
	defer func() {
		// remove it from the map.
		s.connsLock.Lock()
		delete(s.conns, c.RemotePeer().Key())
		s.connsLock.Unlock()

		s.Children().Done()
		c.Children().Done() // child of Conn as well.
	}()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209
	i := 0
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210 211
	for {
		select {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212 213 214 215 216
		case <-s.Closing(): // Swarm closing
			return

		case <-c.Closing(): // Conn closing
			return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218
		case data, ok := <-c.In():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
			if !ok {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
				log.Infof("%s in channel closed", c)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221
				return // channel closed.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223
			i++
Jeromy's avatar
Jeromy committed
224
			log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
			s.Incoming <- msg.New(c.RemotePeer(), data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227 228
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
// Commenting out because it's platform specific
// func setSocketReuse(l manet.Listener) error {
// 	nl := l.NetListener()
//
// 	// for now only TCP. TODO change this when more networks.
// 	file, err := nl.(*net.TCPListener).File()
// 	if err != nil {
// 		return err
// 	}
//
// 	fd := file.Fd()
// 	err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
// 	if err != nil {
// 		return err
// 	}
//
// 	err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
// 	if err != nil {
// 		return err
// 	}
//
// 	return nil
// }