notif.go 2.84 KB
Newer Older
1 2 3
package dht

import (
4
	"context"
5 6
	"io"

7 8
	inet "github.com/libp2p/go-libp2p-net"
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
9
	mstream "github.com/multiformats/go-multistream"
10 11 12 13 14 15 16 17 18
)

// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT

func (nn *netNotifiee) DHT() *IpfsDHT {
	return (*IpfsDHT)(nn)
}

19 20 21 22 23
type peerTracker struct {
	refcount int
	cancel   func()
}

24 25 26
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
	dht := nn.DHT()
	select {
27
	case <-dht.Process().Closing():
28
		return
29
	default:
30
	}
31

32 33 34 35 36
	dht.plk.Lock()
	defer dht.plk.Unlock()

	conn, ok := nn.peers[v.RemotePeer()]
	if ok {
Steven Allen's avatar
Steven Allen committed
37
		conn.refcount++
38 39 40 41 42 43 44 45 46 47
		return
	}

	ctx, cancel := context.WithCancel(dht.Context())

	nn.peers[v.RemotePeer()] = &peerTracker{
		refcount: 1,
		cancel:   cancel,
	}

Steven Allen's avatar
Steven Allen committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
	// Note: We *could* just check the peerstore to see if the remote side supports the dht
	// protocol, but its not clear that that information will make it into the peerstore
	// by the time this notification is sent. So just to be very careful, we brute force this
	// and open a new stream
	go nn.testConnection(ctx, v)

}

func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) {
	dht := nn.DHT()
	for {
		s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

		switch err {
		case nil:
			s.Close()
			dht.plk.Lock()

			// Check if canceled under the lock.
			if ctx.Err() == nil {
				dht.Update(dht.Context(), v.RemotePeer())
			}

			dht.plk.Unlock()
		case io.EOF:
			if ctx.Err() == nil {
				// Connection died but we may still have *an* open connection (context not canceled) so try again.
				continue
76
			}
Steven Allen's avatar
Steven Allen committed
77 78 79 80 81
		case mstream.ErrNotSupported:
			// Client mode only, don't bother adding them to our routing table
		default:
			// real error? thats odd
			log.Errorf("checking dht client type: %s", err)
82
		}
Steven Allen's avatar
Steven Allen committed
83 84
		return
	}
85 86 87 88 89
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
	dht := nn.DHT()
	select {
90
	case <-dht.Process().Closing():
91
		return
92
	default:
93
	}
94

95
	p := v.RemotePeer()
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
	func() {
		dht.plk.Lock()
		defer dht.plk.Unlock()

		conn, ok := nn.peers[p]
		if !ok {
			// Unmatched disconnects are fine. It just means that we were
			// already connected when we registered the listener.
			return
		}
		conn.refcount -= 1
		if conn.refcount == 0 {
			delete(nn.peers, p)
			conn.cancel()
			dht.routingTable.Remove(p)
		}
	}()

	dht.smlk.Lock()
	defer dht.smlk.Unlock()
	ms, ok := dht.strmap[p]
118 119 120
	if !ok {
		return
	}
121 122 123 124 125 126 127 128
	delete(dht.strmap, p)

	// Do this asynchronously as ms.lk can block for a while.
	go func() {
		ms.lk.Lock()
		defer ms.lk.Unlock()
		ms.invalidate()
	}()
129 130 131 132
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133 134
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr)      {}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}