swarm_listen.go 4.14 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5
package swarm

import (
	"fmt"

Jeromy's avatar
Jeromy committed
6 7
	lgbl "github.com/ipfs/go-libp2p-loggables"
	transport "github.com/ipfs/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
8
	mconn "github.com/ipfs/go-libp2p/p2p/metrics/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10
	inet "github.com/ipfs/go-libp2p/p2p/net"
	conn "github.com/ipfs/go-libp2p/p2p/net/conn"
Jeromy's avatar
Jeromy committed
11 12 13
	ma "github.com/jbenet/go-multiaddr"
	ps "github.com/jbenet/go-peerstream"
	context "golang.org/x/net/context"
Jeromy's avatar
Jeromy committed
14
)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15

Jeromy's avatar
Jeromy committed
16
// Open listeners and reuse-dialers for the given addresses
Jeromy's avatar
Jeromy committed
17 18 19 20 21 22 23 24 25
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
	errs := make([]error, len(addrs))
	var succeeded int
	for i, a := range addrs {
		tpt := s.transportForAddr(a)
		if tpt == nil {
			errs[i] = fmt.Errorf("no transport for address: %s", a)
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

Jeromy's avatar
Jeromy committed
27 28 29 30
		d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
		if err != nil {
			errs[i] = err
			continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32
		}

Jeromy's avatar
Jeromy committed
33
		s.dialer.AddDialer(d)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34

Jeromy's avatar
Jeromy committed
35
		list, err := tpt.Listen(a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
		if err != nil {
Jeromy's avatar
Jeromy committed
37 38
			errs[i] = err
			continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39
		}
Jeromy's avatar
Jeromy committed
40 41 42 43 44 45 46

		err = s.addListener(list)
		if err != nil {
			errs[i] = err
			continue
		}
		succeeded++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48
	}

Jeromy's avatar
Jeromy committed
49 50 51 52
	for i, e := range errs {
		if e != nil {
			log.Warning("listen on %s failed: %s", addrs[i], errs[i])
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
	}
Jeromy's avatar
Jeromy committed
54 55 56 57
	if succeeded == 0 && len(addrs) > 0 {
		return fmt.Errorf("failed to listen on any addresses: %s", errs)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59 60
	return nil
}

Jeromy's avatar
Jeromy committed
61 62 63 64 65 66 67 68 69
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
	for _, t := range s.transports {
		if t.Matches(a) {
			return t
		}
	}

	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70

Jeromy's avatar
Jeromy committed
71
func (s *Swarm) addListener(tptlist transport.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73 74 75 76 77

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
	}
Jeromy's avatar
Jeromy committed
78 79

	list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81 82 83 84 85 86
	if err != nil {
		return err
	}

	list.SetAddrFilters(s.Filters)

	if cw, ok := list.(conn.ListenerConnWrapper); ok {
Jeromy's avatar
Jeromy committed
87
		cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88 89 90 91
			return mconn.WrapConn(s.bwc, c)
		})
	}

Jeromy's avatar
Jeromy committed
92 93 94 95
	return s.addConnListener(list)
}

func (s *Swarm) addConnListener(list conn.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96 97 98 99 100 101 102 103
	// AddListener to the peerstream Listener. this will begin accepting connections
	// and streams!
	sl, err := s.swarm.AddListener(list)
	if err != nil {
		return err
	}
	log.Debugf("Swarm Listeners at %s", s.ListenAddresses())

Jeromy's avatar
Jeromy committed
104 105
	maddr := list.Multiaddr()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
	// signal to our notifiees on successful conn.
	s.notifyAll(func(n inet.Notifiee) {
		n.Listen((*Network)(s), maddr)
	})

	// go consume peerstream's listen accept errors. note, these ARE errors.
	// they may be killing the listener, and if we get _any_ we should be
	// fixing this in our conn.Listener (to ignore them or handle them
	// differently.)
	go func(ctx context.Context, sl *ps.Listener) {

		// signal to our notifiees closing
		defer s.notifyAll(func(n inet.Notifiee) {
			n.ListenClose((*Network)(s), maddr)
		})

		for {
			select {
			case err, more := <-sl.AcceptErrors():
				if !more {
					return
				}
Jeromy's avatar
Jeromy committed
128
				log.Warningf("swarm listener accept error: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
			case <-ctx.Done():
				return
			}
		}
	}(s.Context(), sl)

	return nil
}

// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
	ctx := context.Background()
	// this context is for running the handshake, which -- when receiveing connections
	// -- we have no bound on beyond what the transport protocol bounds it at.
	// note that setup + the handshake are bounded by underlying io.
	// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
	// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
	// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)

	sc, err := s.newConnSetup(ctx, c)
	if err != nil {
		log.Debug(err)
		log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
Jeromy's avatar
Jeromy committed
155
		c.Close() // boom. close it.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157 158
		return nil
	}

Jeromy's avatar
Jeromy committed
159 160 161
	// if a peer dials us, remove from dial backoff.
	s.backf.Clear(sc.RemotePeer())

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163
	return sc
}