dht.go 21 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9 10
	"sync"
	"time"
11

12 13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"

19
	"go.opencensus.io/tag"
20

21
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
22
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
23
	"github.com/libp2p/go-libp2p-kad-dht/providers"
Aarsh Shah's avatar
Aarsh Shah committed
24 25 26
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
27

Aarsh Shah's avatar
Aarsh Shah committed
28
	"github.com/gogo/protobuf/proto"
29 30
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
31
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
32
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
33
	"github.com/multiformats/go-base32"
Adin Schmahmann's avatar
Adin Schmahmann committed
34
	"github.com/multiformats/go-multihash"
35 36
)

Matt Joiner's avatar
Matt Joiner committed
37
var logger = logging.Logger("dht")
38

Henrique Dias's avatar
Henrique Dias committed
39 40
const BaseConnMgrScore = 5

41 42 43
type mode int

const (
Steven Allen's avatar
Steven Allen committed
44 45
	modeServer mode = iota + 1
	modeClient
46 47
)

Adin Schmahmann's avatar
Adin Schmahmann committed
48 49 50 51 52
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

53
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
54
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
type IpfsDHT struct {
56 57 58
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
59

60
	datastore ds.Datastore // Local data
61

62
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
63 64
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
65

Aarsh Shah's avatar
Aarsh Shah committed
66
	birth time.Time // When this peer started up
67

68
	Validator record.Validator
69

70 71
	ctx  context.Context
	proc goprocess.Process
72 73 74

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
75

Steven Allen's avatar
Steven Allen committed
76
	plk sync.Mutex
77

78 79
	stripedPutLocks [256]sync.Mutex

80 81
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
Adin Schmahmann's avatar
Adin Schmahmann committed
82 83
	protocols []protocol.ID

84
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
85
	serverProtocols []protocol.ID
86

87 88 89 90
	auto   bool
	mode   mode
	modeLk sync.Mutex

91
	bucketSize int
92
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
93
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
94

95 96 97
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

98 99
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
100
	rtRefreshInterval     time.Duration
Steven Allen's avatar
Steven Allen committed
101
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
102
	triggerSelfLookup     chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
103 104

	maxRecordAge time.Duration
105

106 107 108
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
109
	enableProviders, enableValues bool
110

Aarsh Shah's avatar
Aarsh Shah committed
111
	// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit on the time duration
Aarsh Shah's avatar
Aarsh Shah committed
112
	// between the current time and the last time we successfully queried a peer.
Aarsh Shah's avatar
Aarsh Shah committed
113
	maxLastSuccessfulOutboundThreshold float64
114 115

	fixLowPeersChan chan struct{}
116 117
}

Matt Joiner's avatar
Matt Joiner committed
118 119 120 121
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
122
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
123 124 125 126 127
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

128
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
129 130 131
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
132 133
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
134
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
135 136
		return nil, err
	}
137 138 139
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
140

Adin Schmahmann's avatar
Adin Schmahmann committed
141 142
	if err := cfg.validate(); err != nil {
		return nil, err
143
	}
Aarsh Shah's avatar
Aarsh Shah committed
144 145 146 147
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
148

149
	dht.autoRefresh = cfg.routingTable.autoRefresh
Aarsh Shah's avatar
Aarsh Shah committed
150
	dht.rtRefreshInterval = cfg.routingTable.refreshInterval
151
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
152

153 154 155
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
156

157
	dht.Validator = cfg.validator
158

159 160
	switch cfg.mode {
	case ModeAuto:
161 162
		dht.auto = true
		dht.mode = modeClient
163
	case ModeClient:
164 165
		dht.auto = false
		dht.mode = modeClient
166
	case ModeServer:
167 168 169
		dht.auto = false
		dht.mode = modeServer
	default:
170
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
171 172 173 174 175
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
176
		}
177
	}
178 179 180 181 182 183 184 185 186 187

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
188
	dht.startSelfLookup()
189
	dht.startRefreshing()
Aarsh Shah's avatar
Aarsh Shah committed
190 191 192 193

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
194 195 196 197

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

198 199
	return dht, nil
}
200

201 202 203 204
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
205
	dht, err := New(ctx, h, Datastore(dstore))
206 207 208 209 210 211 212 213 214 215 216
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
217
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
218 219 220
	if err != nil {
		panic(err)
	}
221 222 223
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
224
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
225
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
226 227

	// check if custom test protocols were set
228
	if cfg.v1CompatibleMode {
229 230 231 232 233 234 235 236 237 238 239 240 241 242
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
243 244
	}

245
	dht := &IpfsDHT{
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		triggerRtRefresh:       make(chan chan<- error),
		triggerSelfLookup:      make(chan chan<- error),
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
261
		fixLowPeersChan:        make(chan struct{}),
Jeromy's avatar
Jeromy committed
262
	}
263

Aarsh Shah's avatar
Aarsh Shah committed
264 265 266 267 268 269 270
	// construct routing table
	rt, err := makeRoutingTable(dht, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt

271 272
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
273 274 275 276 277 278

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

279
	dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
280

Aarsh Shah's avatar
Aarsh Shah committed
281
	return dht, nil
Jeromy's avatar
Jeromy committed
282 283
}

Aarsh Shah's avatar
Aarsh Shah committed
284
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
Aarsh Shah's avatar
Aarsh Shah committed
285 286 287 288 289 290 291
	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	l1 := math.Log(float64(1) / float64(defaultBucketSize))                              //(Log(1/K))
	l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
	maxLastSuccessfulOutboundThreshold := l1 / l2 * float64(cfg.routingTable.refreshInterval)
Aarsh Shah's avatar
Aarsh Shah committed
292

Aarsh Shah's avatar
Aarsh Shah committed
293
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
294

Aarsh Shah's avatar
Aarsh Shah committed
295
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
296
	dht.maxLastSuccessfulOutboundThreshold = maxLastSuccessfulOutboundThreshold
Aarsh Shah's avatar
Aarsh Shah committed
297
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
298 299 300 301 302 303 304

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
305 306 307

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
308 309 310 311
	}

	return rt, err
}
312

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
	for {
		select {
		case <-dht.fixLowPeersChan:
		case <-proc.Closing():
			return
		}
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

		if dht.autoRefresh {
			select {
			case dht.triggerRtRefresh <- nil:
			default:
			}
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
358
// putValueToPeer stores the given key/value pair at the peer 'p'
359 360
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
361
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363
	if err != nil {
Steven Allen's avatar
Steven Allen committed
364
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
365 366
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
367

368
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
369
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
370 371
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
372

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
373
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374 375
}

376 377
var errInvalidRecord = errors.New("received invalid record")

378 379
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
380
// NOTE: It will update the dht's peerstore with any new addresses
381
// it finds for the given peer.
382
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
383
	pmes, err := dht.getValueSingle(ctx, p, key)
384
	if err != nil {
385
		return nil, nil, err
386 387
	}

388 389 390
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

391
	if record := pmes.GetRecord(); record != nil {
392
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
393
		logger.Debug("got value")
394

395
		// make sure record is valid.
396
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
397
		if err != nil {
Steven Allen's avatar
Steven Allen committed
398
			logger.Debug("received invalid record (discarded)")
399 400
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
401
			record = new(recpb.Record)
402
		}
403
		return record, peers, err
404
	}
405

406 407 408 409
	if len(peers) > 0 {
		return nil, peers, nil
	}

410
	return nil, nil, routing.ErrNotFound
411 412
}

413
// getValueSingle simply performs the get value RPC with the given parameters
414
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
415
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
416
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
417 418
}

419
// getLocal attempts to retrieve the value from the datastore
420
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
421 422
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

423
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
424
	if err != nil {
Steven Allen's avatar
Steven Allen committed
425
		logger.Warnw("get local failed", "key", key, "error", err)
426 427
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
428

429
	// Double check the key. Can't hurt.
430
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
431
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
432
		return nil, nil
433 434

	}
435
	return rec, nil
436 437
}

438
// putLocal stores the key value pair in the datastore
439
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
440 441
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
442
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
443 444 445
		return err
	}

446
	return dht.datastore.Put(mkDsKey(key), data)
447
}
448

Aarsh Shah's avatar
Aarsh Shah committed
449
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
450
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
451 452 453 454 455 456 457 458 459 460 461
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
//    LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
462
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
Steven Allen's avatar
Steven Allen committed
463
	logger.Debugw("peer found", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
464 465
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
466
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
467
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
468
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
469 470 471 472
		if err != nil {
			// peer not added.
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
473

Aarsh Shah's avatar
Aarsh Shah committed
474
		// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
Aarsh Shah's avatar
Aarsh Shah committed
475
		// value that must have been set in the Routing Table for this peer when it was first added during a connection.
Aarsh Shah's avatar
Aarsh Shah committed
476 477 478 479 480 481
		if newlyAdded && queryPeer {
			dht.routingTable.UpdateLastUsefulAt(p, time.Now())
		} else if queryPeer {
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
482 483
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
484 485
}

Aarsh Shah's avatar
Aarsh Shah committed
486
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
487
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
488
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
489 490
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
491 492 493
	dht.routingTable.RemovePeer(p)

	// since we lost a peer from the RT, we should do this here
494
	dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
495 496
}

497 498 499 500
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
501
	}
502
}
Jeromy's avatar
Jeromy committed
503

Jeromy's avatar
Jeromy committed
504
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
505
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
506
	switch dht.host.Network().Connectedness(id) {
507
	case network.Connected, network.CanConnect:
508 509
		return dht.peerstore.PeerInfo(id)
	default:
510
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
511 512
	}
}
513

Jeromy's avatar
Jeromy committed
514
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
515
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
516
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
517
	return dht.sendRequest(ctx, p, pmes)
518
}
519

Adin Schmahmann's avatar
Adin Schmahmann committed
520 521
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
522
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
523 524
}

525
// nearestPeersToQuery returns the routing tables closest peers.
526
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
527
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
528 529 530
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
531
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
532
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
533
	closer := dht.nearestPeersToQuery(pmes, count)
534 535 536

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
537
		logger.Infow("no closer peers to send", from)
538 539 540
		return nil
	}

Steven Allen's avatar
Steven Allen committed
541
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
542 543 544
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
545
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
546
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
547 548
			return nil
		}
549
		// Dont send a peer back themselves
550
		if clp == from {
551 552 553
			continue
		}

Jeromy's avatar
Jeromy committed
554
		filtered = append(filtered, clp)
555 556
	}

557 558
	// ok seems like closer nodes
	return filtered
559 560
}

561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
579 580 581
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
582 583
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
584
	for _, p := range dht.serverProtocols {
585 586 587 588 589
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
590 591 592 593 594
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
595 596
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
597
	for _, p := range dht.serverProtocols {
598 599 600 601
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
602
	for _, p := range dht.serverProtocols {
603 604 605 606 607 608 609
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
610
					_ = s.Reset()
611 612 613 614 615 616 617 618 619 620 621 622 623
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

624 625 626 627 628 629 630 631 632 633
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

ZhengQi's avatar
ZhengQi committed
634 635 636 637 638
// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

639 640 641 642
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
643 644 645 646

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663

func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
664
		return fmt.Errorf("sending request: %w", err)
665 666
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
667
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
668 669 670
	}
	return nil
}
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}