subscriber_notifee.go 5.74 KB
Newer Older
1 2 3
package dht

import (
4 5
	"fmt"

6 7
	"github.com/libp2p/go-libp2p-core/event"
	"github.com/libp2p/go-libp2p-core/network"
Adin Schmahmann's avatar
Adin Schmahmann committed
8
	"github.com/libp2p/go-libp2p-core/peer"
9 10 11 12

	"github.com/libp2p/go-eventbus"

	"github.com/jbenet/goprocess"
13
	ma "github.com/multiformats/go-multiaddr"
14 15 16 17 18
)

// subscriberNotifee implements network.Notifee and also manages the subscriber to the event bus. We consume peer
// identification events to trigger inclusion in the routing table, and we consume Disconnected events to eject peers
// from it.
19 20 21
type subscriberNotifee struct {
	dht  *IpfsDHT
	subs event.Subscription
22 23
}

24 25
func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
	bufSize := eventbus.BufSize(256)
26 27

	evts := []interface{}{
28 29 30 31 32 33
		// register for event bus notifications of when peers successfully complete identification in order to update
		// the routing table
		new(event.EvtPeerIdentificationCompleted),

		// register for event bus protocol ID changes in order to update the routing table
		new(event.EvtPeerProtocolsUpdated),
Aarsh Shah's avatar
Aarsh Shah committed
34 35 36 37

		// register for event bus notifications for when our local address/addresses change so we can
		// advertise those to the network
		new(event.EvtLocalAddressesUpdated),
38 39 40 41
	}

	// register for event bus local routability changes in order to trigger switching between client and server modes
	// only register for events if the DHT is operating in ModeAuto
42
	if dht.auto == ModeAuto || dht.auto == ModeAutoServer {
43
		evts = append(evts, new(event.EvtLocalReachabilityChanged))
44 45
	}

46
	subs, err := dht.host.EventBus().Subscribe(evts, bufSize)
47
	if err != nil {
48
		return nil, fmt.Errorf("dht could not subscribe to eventbus events; err: %s", err)
49 50
	}

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	nn := &subscriberNotifee{
		dht:  dht,
		subs: subs,
	}

	// register for network notifications
	dht.host.Network().Notify(nn)

	return nn, nil
}

func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
	dht := nn.dht
	defer dht.host.Network().StopNotify(nn)
	defer nn.subs.Close()
66 67 68

	for {
		select {
69
		case e, more := <-nn.subs.Out():
70 71 72 73
			if !more {
				return
			}

74
			switch evt := e.(type) {
Aarsh Shah's avatar
Aarsh Shah committed
75 76 77 78 79 80
			case event.EvtLocalAddressesUpdated:
				// when our address changes, we should proactively tell our closest peers about it so
				// we become discoverable quickly. The Identify protocol will push a signed peer record
				// with our new address to all peers we are connected to. However, we might not necessarily be connected
				// to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way
				// to to forge connections with those matter.
81 82 83
				if dht.autoRefresh || dht.testAddressUpdateProcessing {
					dht.rtRefreshManager.RefreshNoWait()
				}
84
			case event.EvtPeerProtocolsUpdated:
85
				handlePeerChangeEvent(dht, evt.Peer)
Aarsh Shah's avatar
Aarsh Shah committed
86
			case event.EvtPeerIdentificationCompleted:
87
				handlePeerChangeEvent(dht, evt.Peer)
88
			case event.EvtLocalReachabilityChanged:
89
				if dht.auto == ModeAuto || dht.auto == ModeAutoServer {
90 91 92 93
					handleLocalReachabilityChangedEvent(dht, evt)
				} else {
					// something has gone really wrong if we get an event we did not subscribe to
					logger.Errorf("received LocalReachabilityChanged event that was not subscribed to")
94
				}
95 96 97
			default:
				// something has gone really wrong if we get an event for another type
				logger.Errorf("got wrong type from subscription: %T", e)
98 99 100 101 102 103 104
			}
		case <-proc.Closing():
			return
		}
	}
}

105 106
func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) {
	valid, err := dht.validRTPeer(p)
107 108 109
	if err != nil {
		logger.Errorf("could not check peerstore for protocol support: err: %s", err)
		return
Aarsh Shah's avatar
Aarsh Shah committed
110
	} else if valid {
111
		dht.peerFound(dht.ctx, p, false)
112
		dht.fixRTIfNeeded()
113 114
	} else {
		dht.peerStoppedDHT(dht.ctx, p)
115 116 117 118 119 120 121
	}
}

func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabilityChanged) {
	var target mode

	switch e.Reachability {
122
	case network.ReachabilityPrivate:
123
		target = modeClient
124 125 126 127 128 129
	case network.ReachabilityUnknown:
		if dht.auto == ModeAutoServer {
			target = modeServer
		} else {
			target = modeClient
		}
130 131 132 133 134 135 136 137 138
	case network.ReachabilityPublic:
		target = modeServer
	}

	logger.Infof("processed event %T; performing dht mode switch", e)

	err := dht.setMode(target)
	// NOTE: the mode will be printed out as a decimal.
	if err == nil {
Steven Allen's avatar
Steven Allen committed
139
		logger.Infow("switched DHT mode successfully", "mode", target)
140
	} else {
Steven Allen's avatar
Steven Allen committed
141
		logger.Errorw("switching DHT mode failed", "mode", target, "error", err)
142 143 144
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
145 146 147 148
// validRTPeer returns true if the peer supports the DHT protocol and false otherwise. Supporting the DHT protocol means
// supporting the primary protocols, we do not want to add peers that are speaking obsolete secondary protocols to our
// routing table
func (dht *IpfsDHT) validRTPeer(p peer.ID) (bool, error) {
149 150
	b, err := dht.peerstore.FirstSupportedProtocol(p, dht.protocolsStrs...)
	if len(b) == 0 || err != nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
151 152 153
		return false, err
	}

154
	return dht.routingTablePeerFilter == nil || dht.routingTablePeerFilter(dht, dht.Host().Network().ConnsToPeer(p)), nil
Adin Schmahmann's avatar
Adin Schmahmann committed
155 156
}

157
func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
158
	dht := nn.dht
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	select {
	case <-dht.Process().Closing():
		return
	default:
	}

	p := v.RemotePeer()

	// Lock and check to see if we're still connected. We lock to make sure
	// we don't concurrently process a connect event.
	dht.plk.Lock()
	defer dht.plk.Unlock()
	if dht.host.Network().Connectedness(p) == network.Connected {
		// We're still connected.
		return
	}

176
	dht.msgSender.streamDisconnect(dht.Context(), p)
177 178
}

decanus's avatar
removed  
decanus committed
179 180 181 182 183
func (nn *subscriberNotifee) Connected(network.Network, network.Conn)      {}
func (nn *subscriberNotifee) OpenedStream(network.Network, network.Stream) {}
func (nn *subscriberNotifee) ClosedStream(network.Network, network.Stream) {}
func (nn *subscriberNotifee) Listen(network.Network, ma.Multiaddr)         {}
func (nn *subscriberNotifee) ListenClose(network.Network, ma.Multiaddr)    {}