conn.go 4.6 KB
Newer Older
1 2 3 4 5 6
package swarm

import (
	"errors"
	"fmt"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	spipe "github.com/jbenet/go-ipfs/crypto/spipe"
8
	conn "github.com/jbenet/go-ipfs/net/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10
	msg "github.com/jbenet/go-ipfs/net/message"

11
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
12
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
13 14
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16 17 18 19 20 21 22 23 24 25 26 27
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen() error {
	hasErr := false
	retErr := &ListenErr{
		Errors: make([]error, len(s.local.Addresses)),
	}

	// listen on every address
	for i, addr := range s.local.Addresses {
		err := s.connListen(addr)
		if err != nil {
			hasErr = true
			retErr.Errors[i] = err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
			log.Error("Failed to listen on: %s - %s", addr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30 31 32 33 34 35 36 37 38
		}
	}

	if hasErr {
		return retErr
	}
	return nil
}

// Listen for new connections on the given multiaddr
39
func (s *Swarm) connListen(maddr ma.Multiaddr) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40
	list, err := manet.Listen(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41 42 43 44 45 46 47 48 49 50 51 52
	if err != nil {
		return err
	}

	// NOTE: this may require a lock around it later. currently, only run on setup
	s.listeners = append(s.listeners, list)

	// Accept and handle new connections on this listener until it errors
	go func() {
		for {
			nconn, err := list.Accept()
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
				e := fmt.Errorf("Failed to accept connection: %s - %s", maddr, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
				s.errChan <- e

				// if cancel is nil, we're closed.
				if s.cancel == nil {
					return
				}
			} else {
				go s.handleIncomingConn(nconn)
			}
		}
	}()

	return nil
}

69
// Handle getting ID from this peer, handshake, and adding it into the map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
func (s *Swarm) handleIncomingConn(nconn manet.Conn) {
71

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	addr := nconn.RemoteMultiaddr()
73

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75 76 77 78 79 80
	// Construct conn with nil peer for now, because we don't know its ID yet.
	// connSetup will figure this out, and pull out / construct the peer.
	c, err := conn.NewConn(nil, addr, nconn)
	if err != nil {
		s.errChan <- err
		return
	}
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96

	// Setup the new connection
	err = s.connSetup(c)
	if err != nil && err != ErrAlreadyOpen {
		s.errChan <- err
		c.Close()
	}
}

// connSetup adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func (s *Swarm) connSetup(c *conn.Conn) error {
	if c == nil {
		return errors.New("Tried to start nil connection.")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97
	if c.Peer != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
		log.Debug("Starting connection: %s", c.Peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99
	} else {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
		log.Debug("Starting connection: [unknown peer]")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
	}
102

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103 104 105
	if err := s.connSecure(c); err != nil {
		return fmt.Errorf("Conn securing error: %v", err)
	}
106

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
	log.Debug("Secured connection: %s", c.Peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109
	// add address of connection to Peer. Maybe it should happen in connSecure.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110 111
	c.Peer.AddAddress(c.Addr)

112 113 114
	// add to conns
	s.connsLock.Lock()
	if _, ok := s.conns[c.Peer.Key()]; ok {
Jeromy's avatar
Jeromy committed
115
		log.Debug("Conn already open!")
116 117 118 119
		s.connsLock.Unlock()
		return ErrAlreadyOpen
	}
	s.conns[c.Peer.Key()] = c
Jeromy's avatar
Jeromy committed
120
	log.Debug("Added conn to map!")
121 122 123 124 125 126 127
	s.connsLock.Unlock()

	// kick off reader goroutine
	go s.fanIn(c)
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128 129 130
// connSecure setups a secure remote connection.
func (s *Swarm) connSecure(c *conn.Conn) error {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131
	sp, err := spipe.NewSecurePipe(s.ctx, 10, s.local, s.peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134 135 136 137 138 139 140 141 142
	if err != nil {
		return err
	}

	err = sp.Wrap(s.ctx, spipe.Duplex{
		In:  c.Incoming.MsgChan,
		Out: c.Outgoing.MsgChan,
	})
	if err != nil {
		return err
	}
143

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144 145 146 147 148 149 150
	if c.Peer == nil {
		c.Peer = sp.RemotePeer()

	} else if c.Peer != sp.RemotePeer() {
		panic("peers not being constructed correctly.")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
	c.Secure = sp
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
	return nil
153
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167

// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
	for {
		select {
		case <-s.ctx.Done():
			return // told to close.

		case msg, ok := <-s.Outgoing:
			if !ok {
				return
			}

			s.connsLock.RLock()
168
			conn, found := s.conns[msg.Peer().Key()]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170 171 172 173 174 175 176 177
			s.connsLock.RUnlock()

			if !found {
				e := fmt.Errorf("Sent msg to peer without open conn: %v",
					msg.Peer)
				s.errChan <- e
				continue
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178
			// log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
			// queue it in the connection's buffer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
			conn.Secure.Out <- msg.Data()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(c *conn.Conn) {
	for {
		select {
		case <-s.ctx.Done():
			// close Conn.
			c.Close()
			goto out

		case <-c.Closed:
			goto out

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199
		case data, ok := <-c.Secure.In:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200
			if !ok {
201
				e := fmt.Errorf("Error retrieving from conn: %v", c.Peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202 203 204 205
				s.errChan <- e
				goto out
			}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
			// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207

208
			msg := msg.New(c.Peer, data)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209 210 211 212 213 214 215 216 217
			s.Incoming <- msg
		}
	}

out:
	s.connsLock.Lock()
	delete(s.conns, c.Peer.Key())
	s.connsLock.Unlock()
}