dht.go 21.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9 10
	"sync"
	"time"
11

12 13 14 15 16 17
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
18
	"go.uber.org/zap"
19

20
	"go.opencensus.io/tag"
21

22
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
23
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
24
	"github.com/libp2p/go-libp2p-kad-dht/providers"
Aarsh Shah's avatar
Aarsh Shah committed
25 26 27
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
28

Aarsh Shah's avatar
Aarsh Shah committed
29
	"github.com/gogo/protobuf/proto"
30 31
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
32
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
33
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
34
	"github.com/multiformats/go-base32"
Adin Schmahmann's avatar
Adin Schmahmann committed
35
	"github.com/multiformats/go-multihash"
36 37
)

38 39 40 41
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
)
42

Alan Shaw's avatar
Alan Shaw committed
43 44
// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
Henrique Dias's avatar
Henrique Dias committed
45 46
const BaseConnMgrScore = 5

47 48 49
type mode int

const (
Steven Allen's avatar
Steven Allen committed
50 51
	modeServer mode = iota + 1
	modeClient
52 53
)

Adin Schmahmann's avatar
Adin Schmahmann committed
54 55 56 57 58
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

59
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
60
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
type IpfsDHT struct {
62 63 64
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
65

66
	datastore ds.Datastore // Local data
67

68
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
69 70
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
71

Aarsh Shah's avatar
Aarsh Shah committed
72
	birth time.Time // When this peer started up
73

74
	Validator record.Validator
75

76 77
	ctx  context.Context
	proc goprocess.Process
78 79 80

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
81

Steven Allen's avatar
Steven Allen committed
82
	plk sync.Mutex
83

84 85
	stripedPutLocks [256]sync.Mutex

86 87
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
Adin Schmahmann's avatar
Adin Schmahmann committed
88 89
	protocols []protocol.ID

90
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
91
	serverProtocols []protocol.ID
92

93
	auto   ModeOpt
94 95 96
	mode   mode
	modeLk sync.Mutex

97
	bucketSize int
98
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
99
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
100

101 102 103
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

104 105
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
106
	rtRefreshInterval     time.Duration
Steven Allen's avatar
Steven Allen committed
107
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
108
	triggerSelfLookup     chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
109 110

	maxRecordAge time.Duration
111

112 113 114
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
115
	enableProviders, enableValues bool
116

Aarsh Shah's avatar
Aarsh Shah committed
117 118 119
	// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
	// to between two successful query responses from it, failing which,
	// we will ping it to see if it's alive.
120
	successfulOutboundQueryGracePeriod time.Duration
121 122

	fixLowPeersChan chan struct{}
123 124
}

Matt Joiner's avatar
Matt Joiner committed
125 126 127 128
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
129
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
130 131 132 133 134
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

135
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
136 137 138
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
139 140
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
141
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
142 143
		return nil, err
	}
144 145 146
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
147

Adin Schmahmann's avatar
Adin Schmahmann committed
148 149
	if err := cfg.validate(); err != nil {
		return nil, err
150
	}
Aarsh Shah's avatar
Aarsh Shah committed
151 152 153 154
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
155

156
	dht.autoRefresh = cfg.routingTable.autoRefresh
Aarsh Shah's avatar
Aarsh Shah committed
157
	dht.rtRefreshInterval = cfg.routingTable.refreshInterval
158
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
159

160 161 162
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
163

164
	dht.Validator = cfg.validator
165

166
	dht.auto = cfg.mode
167
	switch cfg.mode {
168
	case ModeAuto, ModeClient:
169
		dht.mode = modeClient
170
	case ModeAutoServer, ModeServer:
171 172
		dht.mode = modeServer
	default:
173
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
174 175 176 177 178
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
179
		}
180
	}
181 182 183 184 185 186 187 188 189 190

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
191
	dht.startSelfLookup()
192
	dht.startRefreshing()
Aarsh Shah's avatar
Aarsh Shah committed
193 194 195 196

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
197 198 199 200

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

201 202
	return dht, nil
}
203

204 205 206 207
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
208
	dht, err := New(ctx, h, Datastore(dstore))
209 210 211 212 213 214 215 216 217 218 219
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
220
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
221 222 223
	if err != nil {
		panic(err)
	}
224 225 226
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
227
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
228
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
229 230

	// check if custom test protocols were set
231
	if cfg.v1CompatibleMode {
232 233 234 235 236 237 238 239 240 241 242 243 244 245
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
246 247
	}

248
	dht := &IpfsDHT{
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		triggerRtRefresh:       make(chan chan<- error),
		triggerSelfLookup:      make(chan chan<- error),
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
264
		fixLowPeersChan:        make(chan struct{}),
Jeromy's avatar
Jeromy committed
265
	}
266

Aarsh Shah's avatar
Aarsh Shah committed
267 268 269 270 271 272 273
	// construct routing table
	rt, err := makeRoutingTable(dht, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt

274 275
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
276 277 278 279 280 281

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
282 283 284 285 286
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
287

Aarsh Shah's avatar
Aarsh Shah committed
288
	return dht, nil
Jeromy's avatar
Jeromy committed
289 290
}

Aarsh Shah's avatar
Aarsh Shah committed
291
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
Aarsh Shah's avatar
Aarsh Shah committed
292 293 294 295 296 297
	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	l1 := math.Log(float64(1) / float64(defaultBucketSize))                              //(Log(1/K))
	l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
298
	maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
Aarsh Shah's avatar
Aarsh Shah committed
299

Aarsh Shah's avatar
Aarsh Shah committed
300
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
301

Aarsh Shah's avatar
Aarsh Shah committed
302
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
303
	dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
Aarsh Shah's avatar
Aarsh Shah committed
304
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
305 306 307 308 309 310 311

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
312 313 314

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
315 316 317 318
	}

	return rt, err
}
319

Will Scott's avatar
Will Scott committed
320 321 322 323 324
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
	for {
		select {
		case <-dht.fixLowPeersChan:
		case <-proc.Closing():
			return
		}
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

		if dht.autoRefresh {
			select {
			case dht.triggerRtRefresh <- nil:
			default:
			}
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
370
// putValueToPeer stores the given key/value pair at the peer 'p'
371 372
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
373
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
375
	if err != nil {
Steven Allen's avatar
Steven Allen committed
376
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
377 378
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379

380
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
381
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382 383
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
384

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
385
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
386 387
}

388 389
var errInvalidRecord = errors.New("received invalid record")

390 391
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
392
// NOTE: It will update the dht's peerstore with any new addresses
393
// it finds for the given peer.
394
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
395
	pmes, err := dht.getValueSingle(ctx, p, key)
396
	if err != nil {
397
		return nil, nil, err
398 399
	}

400 401 402
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

403
	if record := pmes.GetRecord(); record != nil {
404
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
405
		logger.Debug("got value")
406

407
		// make sure record is valid.
408
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
409
		if err != nil {
Steven Allen's avatar
Steven Allen committed
410
			logger.Debug("received invalid record (discarded)")
411 412
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
413
			record = new(recpb.Record)
414
		}
415
		return record, peers, err
416
	}
417

418 419 420 421
	if len(peers) > 0 {
		return nil, peers, nil
	}

422
	return nil, nil, routing.ErrNotFound
423 424
}

425
// getValueSingle simply performs the get value RPC with the given parameters
426
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
427
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
428
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
429 430
}

431
// getLocal attempts to retrieve the value from the datastore
432
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
433 434
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

435
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
436
	if err != nil {
Steven Allen's avatar
Steven Allen committed
437
		logger.Warnw("get local failed", "key", key, "error", err)
438 439
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440

441
	// Double check the key. Can't hurt.
442
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
443
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
444
		return nil, nil
445 446

	}
447
	return rec, nil
448 449
}

450
// putLocal stores the key value pair in the datastore
451
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
452 453
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
454
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
455 456 457
		return err
	}

458
	return dht.datastore.Put(mkDsKey(key), data)
459
}
460

Aarsh Shah's avatar
Aarsh Shah committed
461
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
462
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
463 464 465 466 467 468 469 470 471 472 473
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
//    LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
474
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
475 476 477
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
478 479
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
480
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
481
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
482
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
483 484 485 486
		if err != nil {
			// peer not added.
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
487

Aarsh Shah's avatar
Aarsh Shah committed
488
		// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
Aarsh Shah's avatar
Aarsh Shah committed
489
		// value that must have been set in the Routing Table for this peer when it was first added during a connection.
Aarsh Shah's avatar
Aarsh Shah committed
490 491 492 493 494 495
		if newlyAdded && queryPeer {
			dht.routingTable.UpdateLastUsefulAt(p, time.Now())
		} else if queryPeer {
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
496 497
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
498 499
}

Aarsh Shah's avatar
Aarsh Shah committed
500
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
501
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
502
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
503 504
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
505 506 507
	dht.routingTable.RemovePeer(p)

	// since we lost a peer from the RT, we should do this here
508
	dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
509 510
}

511 512 513 514
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
515
	}
516
}
Jeromy's avatar
Jeromy committed
517

Jeromy's avatar
Jeromy committed
518
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
519
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
520
	switch dht.host.Network().Connectedness(id) {
521
	case network.Connected, network.CanConnect:
522 523
		return dht.peerstore.PeerInfo(id)
	default:
524
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
525 526
	}
}
527

Jeromy's avatar
Jeromy committed
528
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
529
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
530
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
531
	return dht.sendRequest(ctx, p, pmes)
532
}
533

Adin Schmahmann's avatar
Adin Schmahmann committed
534 535
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
536
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
537 538
}

539
// nearestPeersToQuery returns the routing tables closest peers.
540
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
541
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
542 543 544
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
545
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
546
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
547
	closer := dht.nearestPeersToQuery(pmes, count)
548 549 550

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
551
		logger.Infow("no closer peers to send", from)
552 553 554
		return nil
	}

Steven Allen's avatar
Steven Allen committed
555
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
556 557 558
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
559
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
560
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
561 562
			return nil
		}
563
		// Dont send a peer back themselves
564
		if clp == from {
565 566 567
			continue
		}

Jeromy's avatar
Jeromy committed
568
		filtered = append(filtered, clp)
569 570
	}

571 572
	// ok seems like closer nodes
	return filtered
573 574
}

575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
593 594 595
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
596 597
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
598
	for _, p := range dht.serverProtocols {
599 600 601 602 603
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
604 605 606 607 608
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
609 610
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
611
	for _, p := range dht.serverProtocols {
612 613 614 615
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
616
	for _, p := range dht.serverProtocols {
617 618 619 620 621 622 623
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
624
					_ = s.Reset()
625 626 627 628 629 630 631 632 633 634 635 636 637
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
638
// Context returns the DHT's context.
639 640 641 642
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
643
// Process returns the DHT's process.
644 645 646 647
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
648
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
649 650 651 652
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
653
// Close calls Process Close.
654 655 656
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
657 658 659 660

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
661

Alan Shaw's avatar
Alan Shaw committed
662
// PeerID returns the DHT node's Peer ID.
663 664 665 666
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
667
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
668 669 670 671
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
672
// Host returns the libp2p host this DHT is operating with.
673 674 675 676
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
677
// Ping sends a ping message to the passed peer and waits for a response.
678 679 680 681
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
682
		return fmt.Errorf("sending request: %w", err)
683 684
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
685
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
686 687 688
	}
	return nil
}
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}