swarm_listen.go 4.26 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5
package swarm

import (
	"fmt"

Jeromy's avatar
Jeromy committed
6
	lgbl "github.com/ipfs/go-libp2p/loggables"
Jeromy's avatar
Jeromy committed
7
	mconn "github.com/ipfs/go-libp2p/p2p/metrics/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8 9
	inet "github.com/ipfs/go-libp2p/p2p/net"
	conn "github.com/ipfs/go-libp2p/p2p/net/conn"
Jeromy's avatar
Jeromy committed
10 11
	transport "github.com/ipfs/go-libp2p/p2p/net/transport"

12
	ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
Jeromy's avatar
Jeromy committed
13
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Jeromy's avatar
Jeromy committed
14
	ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
Jeromy's avatar
Jeromy committed
15
)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16

Jeromy's avatar
Jeromy committed
17
// Open listeners and reuse-dialers for the given addresses
Jeromy's avatar
Jeromy committed
18 19 20 21 22 23 24 25 26
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
	errs := make([]error, len(addrs))
	var succeeded int
	for i, a := range addrs {
		tpt := s.transportForAddr(a)
		if tpt == nil {
			errs[i] = fmt.Errorf("no transport for address: %s", a)
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27

Jeromy's avatar
Jeromy committed
28 29 30 31
		d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
		if err != nil {
			errs[i] = err
			continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33
		}

Jeromy's avatar
Jeromy committed
34
		s.dialer.AddDialer(d)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35

Jeromy's avatar
Jeromy committed
36
		list, err := tpt.Listen(a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
		if err != nil {
Jeromy's avatar
Jeromy committed
38 39
			errs[i] = err
			continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40
		}
Jeromy's avatar
Jeromy committed
41 42 43 44 45 46 47

		err = s.addListener(list)
		if err != nil {
			errs[i] = err
			continue
		}
		succeeded++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48 49
	}

Jeromy's avatar
Jeromy committed
50 51 52 53
	for i, e := range errs {
		if e != nil {
			log.Warning("listen on %s failed: %s", addrs[i], errs[i])
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
	}
Jeromy's avatar
Jeromy committed
55 56 57 58
	if succeeded == 0 && len(addrs) > 0 {
		return fmt.Errorf("failed to listen on any addresses: %s", errs)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60 61
	return nil
}

Jeromy's avatar
Jeromy committed
62 63 64 65 66 67 68 69 70
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
	for _, t := range s.transports {
		if t.Matches(a) {
			return t
		}
	}

	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

Jeromy's avatar
Jeromy committed
72
func (s *Swarm) addListener(tptlist transport.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73 74 75 76 77 78

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
	}
Jeromy's avatar
Jeromy committed
79 80

	list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81 82 83 84 85 86 87
	if err != nil {
		return err
	}

	list.SetAddrFilters(s.Filters)

	if cw, ok := list.(conn.ListenerConnWrapper); ok {
Jeromy's avatar
Jeromy committed
88
		cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89 90 91 92
			return mconn.WrapConn(s.bwc, c)
		})
	}

Jeromy's avatar
Jeromy committed
93 94 95 96
	return s.addConnListener(list)
}

func (s *Swarm) addConnListener(list conn.Listener) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97 98 99 100 101 102 103 104
	// AddListener to the peerstream Listener. this will begin accepting connections
	// and streams!
	sl, err := s.swarm.AddListener(list)
	if err != nil {
		return err
	}
	log.Debugf("Swarm Listeners at %s", s.ListenAddresses())

Jeromy's avatar
Jeromy committed
105 106
	maddr := list.Multiaddr()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
	// signal to our notifiees on successful conn.
	s.notifyAll(func(n inet.Notifiee) {
		n.Listen((*Network)(s), maddr)
	})

	// go consume peerstream's listen accept errors. note, these ARE errors.
	// they may be killing the listener, and if we get _any_ we should be
	// fixing this in our conn.Listener (to ignore them or handle them
	// differently.)
	go func(ctx context.Context, sl *ps.Listener) {

		// signal to our notifiees closing
		defer s.notifyAll(func(n inet.Notifiee) {
			n.ListenClose((*Network)(s), maddr)
		})

		for {
			select {
			case err, more := <-sl.AcceptErrors():
				if !more {
					return
				}
Jeromy's avatar
Jeromy committed
129
				log.Warningf("swarm listener accept error: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
			case <-ctx.Done():
				return
			}
		}
	}(s.Context(), sl)

	return nil
}

// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
	ctx := context.Background()
	// this context is for running the handshake, which -- when receiveing connections
	// -- we have no bound on beyond what the transport protocol bounds it at.
	// note that setup + the handshake are bounded by underlying io.
	// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
	// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
	// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)

	sc, err := s.newConnSetup(ctx, c)
	if err != nil {
		log.Debug(err)
		log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
Jeromy's avatar
Jeromy committed
156
		c.Close() // boom. close it.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158 159
		return nil
	}

Jeromy's avatar
Jeromy committed
160 161 162
	// if a peer dials us, remove from dial backoff.
	s.backf.Clear(sc.RemotePeer())

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163 164
	return sc
}