swarm_listen.go 4.27 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"fmt"

Jeromy's avatar
Jeromy committed
7 8 9 10 11 12 13 14
	conn "github.com/libp2p/go-libp2p-conn"
	iconn "github.com/libp2p/go-libp2p-interface-conn"
	lgbl "github.com/libp2p/go-libp2p-loggables"
	mconn "github.com/libp2p/go-libp2p-metrics/conn"
	inet "github.com/libp2p/go-libp2p-net"
	transport "github.com/libp2p/go-libp2p-transport"
	ps "github.com/libp2p/go-peerstream"
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
15
)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16

17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
	tpt := s.transportForAddr(a)
	if tpt == nil {
		return fmt.Errorf("no transport for address: %s", a)
	}

	d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
	if err != nil {
		return err
	}

	s.dialer.AddDialer(d)

	list, err := tpt.Listen(a)
	if err != nil {
		return err
	}

	err = s.addListener(list)
	if err != nil {
		return err
	}

	return nil
}

Jeromy's avatar
Jeromy committed
43
// Open listeners and reuse-dialers for the given addresses
Jeromy's avatar
Jeromy committed
44 45 46 47
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
	errs := make([]error, len(addrs))
	var succeeded int
	for i, a := range addrs {
48
		if err := s.AddListenAddr(a); err != nil {
Jeromy's avatar
Jeromy committed
49
			errs[i] = err
50 51
		} else {
			succeeded++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53 54
		}
	}

Jeromy's avatar
Jeromy committed
55 56
	for i, e := range errs {
		if e != nil {
Jeromy's avatar
Jeromy committed
57
			log.Warningf("listen on %s failed: %s", addrs[i], errs[i])
Jeromy's avatar
Jeromy committed
58
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
	}
60

Jeromy's avatar
Jeromy committed
61 62 63 64
	if succeeded == 0 && len(addrs) > 0 {
		return fmt.Errorf("failed to listen on any addresses: %s", errs)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66 67
	return nil
}

Jeromy's avatar
Jeromy committed
68 69 70 71 72 73 74 75 76
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
	for _, t := range s.transports {
		if t.Matches(a) {
			return t
		}
	}

	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77

Jeromy's avatar
Jeromy committed
78
func (s *Swarm) addListener(tptlist transport.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80 81 82 83 84

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
	}
Jeromy's avatar
Jeromy committed
85

86
	list, err := conn.WrapTransportListenerWithProtector(s.Context(), tptlist, s.local, sk, s.protec)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91 92
	if err != nil {
		return err
	}

	list.SetAddrFilters(s.Filters)

93
	if cw, ok := list.(conn.ListenerConnWrapper); ok && s.bwc != nil {
Jeromy's avatar
Jeromy committed
94
		cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95 96 97 98
			return mconn.WrapConn(s.bwc, c)
		})
	}

Jeromy's avatar
Jeromy committed
99 100 101
	return s.addConnListener(list)
}

102
func (s *Swarm) addConnListener(list iconn.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103 104 105 106 107 108 109 110
	// AddListener to the peerstream Listener. this will begin accepting connections
	// and streams!
	sl, err := s.swarm.AddListener(list)
	if err != nil {
		return err
	}
	log.Debugf("Swarm Listeners at %s", s.ListenAddresses())

Jeromy's avatar
Jeromy committed
111 112
	maddr := list.Multiaddr()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
	// signal to our notifiees on successful conn.
	s.notifyAll(func(n inet.Notifiee) {
		n.Listen((*Network)(s), maddr)
	})

	// go consume peerstream's listen accept errors. note, these ARE errors.
	// they may be killing the listener, and if we get _any_ we should be
	// fixing this in our conn.Listener (to ignore them or handle them
	// differently.)
	go func(ctx context.Context, sl *ps.Listener) {

		// signal to our notifiees closing
		defer s.notifyAll(func(n inet.Notifiee) {
			n.ListenClose((*Network)(s), maddr)
		})

		for {
			select {
			case err, more := <-sl.AcceptErrors():
				if !more {
					return
				}
Jeromy's avatar
Jeromy committed
135
				log.Warningf("swarm listener accept error: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137 138 139 140 141 142 143 144 145 146 147
			case <-ctx.Done():
				return
			}
		}
	}(s.Context(), sl)

	return nil
}

// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
Steven Allen's avatar
Steven Allen committed
148
// See https://godoc.org/github.com/libp2p/go-peerstream for more information
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150 151 152 153 154 155 156 157 158 159 160 161
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
	ctx := context.Background()
	// this context is for running the handshake, which -- when receiveing connections
	// -- we have no bound on beyond what the transport protocol bounds it at.
	// note that setup + the handshake are bounded by underlying io.
	// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
	// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
	// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)

	sc, err := s.newConnSetup(ctx, c)
	if err != nil {
		log.Debug(err)
		log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
Jeromy's avatar
Jeromy committed
162
		c.Close() // boom. close it.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163 164 165
		return nil
	}

Jeromy's avatar
Jeromy committed
166 167 168
	// if a peer dials us, remove from dial backoff.
	s.backf.Clear(sc.RemotePeer())

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170
	return sc
}