swarm_listen.go 4.26 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5
package swarm

import (
	"fmt"

Jeromy's avatar
Jeromy committed
6
	lgbl "github.com/ipfs/go-libp2p/loggables"
Jeromy's avatar
Jeromy committed
7
	mconn "github.com/ipfs/go-libp2p/p2p/metrics/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8 9
	inet "github.com/ipfs/go-libp2p/p2p/net"
	conn "github.com/ipfs/go-libp2p/p2p/net/conn"
Jeromy's avatar
Jeromy committed
10 11
	transport "github.com/ipfs/go-libp2p/p2p/net/transport"

Jeromy's avatar
Jeromy committed
12 13
	ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Jeromy's avatar
Jeromy committed
14 15
	ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
) // Open listeners and reuse-dialers for the given addresses
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16

Jeromy's avatar
Jeromy committed
17 18 19 20 21 22 23 24 25
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
	errs := make([]error, len(addrs))
	var succeeded int
	for i, a := range addrs {
		tpt := s.transportForAddr(a)
		if tpt == nil {
			errs[i] = fmt.Errorf("no transport for address: %s", a)
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

Jeromy's avatar
Jeromy committed
27 28 29 30
		d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
		if err != nil {
			errs[i] = err
			continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32
		}

Jeromy's avatar
Jeromy committed
33
		s.dialer.AddDialer(d)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34

Jeromy's avatar
Jeromy committed
35
		list, err := tpt.Listen(a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
		if err != nil {
Jeromy's avatar
Jeromy committed
37 38
			errs[i] = err
			continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39
		}
Jeromy's avatar
Jeromy committed
40 41 42 43 44 45 46

		err = s.addListener(list)
		if err != nil {
			errs[i] = err
			continue
		}
		succeeded++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48
	}

Jeromy's avatar
Jeromy committed
49 50 51 52
	for i, e := range errs {
		if e != nil {
			log.Warning("listen on %s failed: %s", addrs[i], errs[i])
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
	}
Jeromy's avatar
Jeromy committed
54 55 56 57
	if succeeded == 0 && len(addrs) > 0 {
		return fmt.Errorf("failed to listen on any addresses: %s", errs)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59 60
	return nil
}

Jeromy's avatar
Jeromy committed
61 62 63 64 65 66 67 68 69
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
	for _, t := range s.transports {
		if t.Matches(a) {
			return t
		}
	}

	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70

Jeromy's avatar
Jeromy committed
71
func (s *Swarm) addListener(tptlist transport.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73 74 75 76 77

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
	}
Jeromy's avatar
Jeromy committed
78 79

	list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81 82 83 84 85 86
	if err != nil {
		return err
	}

	list.SetAddrFilters(s.Filters)

	if cw, ok := list.(conn.ListenerConnWrapper); ok {
Jeromy's avatar
Jeromy committed
87
		cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88 89 90 91
			return mconn.WrapConn(s.bwc, c)
		})
	}

Jeromy's avatar
Jeromy committed
92 93 94 95
	return s.addConnListener(list)
}

func (s *Swarm) addConnListener(list conn.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96 97 98 99 100 101 102 103
	// AddListener to the peerstream Listener. this will begin accepting connections
	// and streams!
	sl, err := s.swarm.AddListener(list)
	if err != nil {
		return err
	}
	log.Debugf("Swarm Listeners at %s", s.ListenAddresses())

Jeromy's avatar
Jeromy committed
104 105
	maddr := list.Multiaddr()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
	// signal to our notifiees on successful conn.
	s.notifyAll(func(n inet.Notifiee) {
		n.Listen((*Network)(s), maddr)
	})

	// go consume peerstream's listen accept errors. note, these ARE errors.
	// they may be killing the listener, and if we get _any_ we should be
	// fixing this in our conn.Listener (to ignore them or handle them
	// differently.)
	go func(ctx context.Context, sl *ps.Listener) {

		// signal to our notifiees closing
		defer s.notifyAll(func(n inet.Notifiee) {
			n.ListenClose((*Network)(s), maddr)
		})

		for {
			select {
			case err, more := <-sl.AcceptErrors():
				if !more {
					return
				}
Jeromy's avatar
Jeromy committed
128
				log.Warningf("swarm listener accept error: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
			case <-ctx.Done():
				return
			}
		}
	}(s.Context(), sl)

	return nil
}

// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
	ctx := context.Background()
	// this context is for running the handshake, which -- when receiveing connections
	// -- we have no bound on beyond what the transport protocol bounds it at.
	// note that setup + the handshake are bounded by underlying io.
	// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
	// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
	// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)

	sc, err := s.newConnSetup(ctx, c)
	if err != nil {
		log.Debug(err)
		log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
		c.Close() // boom. close it.
		return nil
	}

Jeromy's avatar
Jeromy committed
159 160 161
	// if a peer dials us, remove from dial backoff.
	s.backf.Clear(sc.RemotePeer())

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163
	return sc
}