swarm_listen.go 4.24 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"fmt"

Jeromy's avatar
Jeromy committed
7 8 9 10 11 12 13 14
	conn "github.com/libp2p/go-libp2p-conn"
	iconn "github.com/libp2p/go-libp2p-interface-conn"
	lgbl "github.com/libp2p/go-libp2p-loggables"
	mconn "github.com/libp2p/go-libp2p-metrics/conn"
	inet "github.com/libp2p/go-libp2p-net"
	transport "github.com/libp2p/go-libp2p-transport"
	ps "github.com/libp2p/go-peerstream"
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
15
)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16

17 18 19 20 21 22
func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
	tpt := s.transportForAddr(a)
	if tpt == nil {
		return fmt.Errorf("no transport for address: %s", a)
	}

23
	d, err := tpt.Dialer(a, transport.ReusePorts)
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
	if err != nil {
		return err
	}

	s.dialer.AddDialer(d)

	list, err := tpt.Listen(a)
	if err != nil {
		return err
	}

	err = s.addListener(list)
	if err != nil {
		return err
	}

	return nil
}

Jeromy's avatar
Jeromy committed
43
// Open listeners and reuse-dialers for the given addresses
Jeromy's avatar
Jeromy committed
44 45 46 47
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
	errs := make([]error, len(addrs))
	var succeeded int
	for i, a := range addrs {
48
		if err := s.AddListenAddr(a); err != nil {
Jeromy's avatar
Jeromy committed
49
			errs[i] = err
50 51
		} else {
			succeeded++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53 54
		}
	}

Jeromy's avatar
Jeromy committed
55 56
	for i, e := range errs {
		if e != nil {
Jeromy's avatar
Jeromy committed
57
			log.Warningf("listen on %s failed: %s", addrs[i], errs[i])
Jeromy's avatar
Jeromy committed
58
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
	}
60

Jeromy's avatar
Jeromy committed
61 62 63 64
	if succeeded == 0 && len(addrs) > 0 {
		return fmt.Errorf("failed to listen on any addresses: %s", errs)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66 67
	return nil
}

Jeromy's avatar
Jeromy committed
68 69 70 71 72 73 74 75 76
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
	for _, t := range s.transports {
		if t.Matches(a) {
			return t
		}
	}

	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77

Jeromy's avatar
Jeromy committed
78
func (s *Swarm) addListener(tptlist transport.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80 81 82 83 84

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
	}
Jeromy's avatar
Jeromy committed
85

86
	list, err := conn.WrapTransportListenerWithProtector(s.Context(), tptlist, s.local, sk, s.protec)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91 92
	if err != nil {
		return err
	}

	list.SetAddrFilters(s.Filters)

93
	if cw, ok := list.(conn.ListenerConnWrapper); ok && s.bwc != nil {
Jeromy's avatar
Jeromy committed
94
		cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95 96 97 98
			return mconn.WrapConn(s.bwc, c)
		})
	}

Jeromy's avatar
Jeromy committed
99 100 101
	return s.addConnListener(list)
}

102
func (s *Swarm) addConnListener(list iconn.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103 104 105 106 107 108 109 110
	// AddListener to the peerstream Listener. this will begin accepting connections
	// and streams!
	sl, err := s.swarm.AddListener(list)
	if err != nil {
		return err
	}
	log.Debugf("Swarm Listeners at %s", s.ListenAddresses())

Jeromy's avatar
Jeromy committed
111 112
	maddr := list.Multiaddr()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
	// signal to our notifiees on successful conn.
	s.notifyAll(func(n inet.Notifiee) {
		n.Listen((*Network)(s), maddr)
	})

	// go consume peerstream's listen accept errors. note, these ARE errors.
	// they may be killing the listener, and if we get _any_ we should be
	// fixing this in our conn.Listener (to ignore them or handle them
	// differently.)
	go func(ctx context.Context, sl *ps.Listener) {

		// signal to our notifiees closing
		defer s.notifyAll(func(n inet.Notifiee) {
			n.ListenClose((*Network)(s), maddr)
		})

		for {
			select {
			case err, more := <-sl.AcceptErrors():
				if !more {
					return
				}
Jeromy's avatar
Jeromy committed
135
				log.Warningf("swarm listener accept error: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137 138 139 140 141 142 143 144 145 146 147
			case <-ctx.Done():
				return
			}
		}
	}(s.Context(), sl)

	return nil
}

// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
Steven Allen's avatar
Steven Allen committed
148
// See https://godoc.org/github.com/libp2p/go-peerstream for more information
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150 151 152 153 154 155 156 157 158 159 160 161
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
	ctx := context.Background()
	// this context is for running the handshake, which -- when receiveing connections
	// -- we have no bound on beyond what the transport protocol bounds it at.
	// note that setup + the handshake are bounded by underlying io.
	// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
	// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
	// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)

	sc, err := s.newConnSetup(ctx, c)
	if err != nil {
		log.Debug(err)
		log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
Jeromy's avatar
Jeromy committed
162
		c.Close() // boom. close it.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163 164 165
		return nil
	}

Jeromy's avatar
Jeromy committed
166 167 168
	// if a peer dials us, remove from dial backoff.
	s.backf.Clear(sc.RemotePeer())

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170
	return sc
}