dht.go 24.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
6
	"math"
7
	"math/rand"
8 9
	"sync"
	"time"
10

11 12 13 14 15 16
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
17

18
	"github.com/libp2p/go-libp2p-kad-dht/internal"
19
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
20
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
21
	"github.com/libp2p/go-libp2p-kad-dht/providers"
22
	"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Aarsh Shah's avatar
Aarsh Shah committed
23
	kb "github.com/libp2p/go-libp2p-kbucket"
24
	"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
Aarsh Shah's avatar
Aarsh Shah committed
25 26
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
27

Aarsh Shah's avatar
Aarsh Shah committed
28
	"github.com/gogo/protobuf/proto"
29 30
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
31
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
32
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
33
	"github.com/multiformats/go-base32"
34
	ma "github.com/multiformats/go-multiaddr"
35 36
	"go.opencensus.io/tag"
	"go.uber.org/zap"
37 38
)

39 40 41
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
Aarsh Shah's avatar
Aarsh Shah committed
42 43

	rtFreezeTimeout = 1 * time.Minute
44
)
45

Steven Allen's avatar
Steven Allen committed
46 47 48 49 50 51
const (
	// BaseConnMgrScore is the base of the score set on the connection
	// manager "kbucket" tag. It is added with the common prefix length
	// between two peer IDs.
	baseConnMgrScore = 5
)
Henrique Dias's avatar
Henrique Dias committed
52

53 54 55
type mode int

const (
Steven Allen's avatar
Steven Allen committed
56 57
	modeServer mode = iota + 1
	modeClient
58 59
)

Adin Schmahmann's avatar
Adin Schmahmann committed
60 61 62 63
const (
	kad1 protocol.ID = "/kad/1.0.0"
)

64
const (
65 66
	kbucketTag       = "kbucket"
	protectedBuckets = 2
67 68
)

Aarsh Shah's avatar
Aarsh Shah committed
69 70 71 72 73
type addPeerRTReq struct {
	p         peer.ID
	queryPeer bool
}

74
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
75
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
type IpfsDHT struct {
77 78 79
	host      host.Host // the network services we need
	self      peer.ID   // Local peer (yourself)
	selfKey   kb.ID
80
	peerstore peerstore.Peerstore // Peer Registry
81

82
	datastore ds.Datastore // Local data
83

84
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
85 86
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
87

88 89 90
	// manages Routing Table refresh
	rtRefreshManager *rtrefresh.RtRefreshManager

Aarsh Shah's avatar
Aarsh Shah committed
91
	birth time.Time // When this peer started up
92

93
	Validator record.Validator
94

95 96
	ctx  context.Context
	proc goprocess.Process
97

98
	protoMessenger *pb.ProtocolMessenger
99
	messageMgr     *messageManager
100

Steven Allen's avatar
Steven Allen committed
101
	plk sync.Mutex
102

103 104
	stripedPutLocks [256]sync.Mutex

105 106
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
107 108
	protocols     []protocol.ID
	protocolsStrs []string
Adin Schmahmann's avatar
Adin Schmahmann committed
109

110
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
111
	serverProtocols []protocol.ID
112

113
	auto   ModeOpt
114 115 116
	mode   mode
	modeLk sync.Mutex

117
	bucketSize int
118
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
119
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
120

121 122
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc
123
	rtPeerDiversityFilter  peerdiversity.PeerIPGroupFilter
124

125
	autoRefresh bool
Aarsh Shah's avatar
Aarsh Shah committed
126

127 128 129 130 131
	// A set of bootstrap peers to fallback on if all other attempts to fix
	// the routing table fail (or, e.g., this is the first time this node is
	// connecting to the network).
	bootstrapPeers []peer.AddrInfo

Aarsh Shah's avatar
Aarsh Shah committed
132
	maxRecordAge time.Duration
133

134 135 136
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
137
	enableProviders, enableValues bool
138

Sam's avatar
Sam committed
139 140
	disableFixLowPeers bool
	fixLowPeersChan    chan struct{}
Aarsh Shah's avatar
Aarsh Shah committed
141 142 143 144 145

	addPeerToRTChan   chan addPeerRTReq
	refreshFinishedCh chan struct{}

	rtFreezeTimeout time.Duration
146 147 148

	// configuration variables for tests
	testAddressUpdateProcessing bool
149 150
}

Matt Joiner's avatar
Matt Joiner committed
151 152 153 154
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
155
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
156 157 158 159 160
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

161
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
162 163 164
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
165 166
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
167
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
168 169
		return nil, err
	}
170 171 172
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
173

Adin Schmahmann's avatar
Adin Schmahmann committed
174 175
	if err := cfg.validate(); err != nil {
		return nil, err
176
	}
177

Aarsh Shah's avatar
Aarsh Shah committed
178 179 180 181
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
182

183
	dht.autoRefresh = cfg.routingTable.autoRefresh
184

185 186 187
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Sam's avatar
Sam committed
188
	dht.disableFixLowPeers = cfg.disableFixLowPeers
Aarsh Shah's avatar
Aarsh Shah committed
189

190
	dht.Validator = cfg.validator
191 192
	dht.messageMgr = &messageManager{
		host:      h,
193
		strmap:    make(map[peer.ID]*peerMessageSender),
194 195
		protocols: dht.protocols,
	}
196
	dht.protoMessenger, err = pb.NewProtocolMessenger(dht.messageMgr, pb.WithValidator(dht.Validator))
197 198 199
	if err != nil {
		return nil, err
	}
200

201 202
	dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing

203
	dht.auto = cfg.mode
204
	switch cfg.mode {
205
	case ModeAuto, ModeClient:
206
		dht.mode = modeClient
207
	case ModeAutoServer, ModeServer:
208 209
		dht.mode = modeServer
	default:
210
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
211 212 213 214 215
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
216
		}
217
	}
218 219 220 221 222 223 224 225 226 227

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
228 229 230
	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
231

Aarsh Shah's avatar
Aarsh Shah committed
232
	dht.proc.Go(dht.rtPeerLoop)
233

234 235 236 237 238 239 240 241 242
	// Fill routing table with currently connected peers that are DHT servers
	dht.plk.Lock()
	for _, p := range dht.host.Network().Peers() {
		dht.peerFound(dht.ctx, p, false)
	}
	dht.plk.Unlock()

	dht.proc.Go(dht.populatePeers)

243 244
	return dht, nil
}
245

246 247 248 249
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
250
	dht, err := New(ctx, h, Datastore(dstore))
251 252 253 254 255 256 257 258 259 260
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
261
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
262 263 264
	if err != nil {
		panic(err)
	}
265 266 267
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
268
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
269
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
270

271
	v1proto := cfg.protocolPrefix + kad1
272 273 274 275 276

	if cfg.v1ProtocolOverride != "" {
		v1proto = cfg.v1ProtocolOverride
	}

277 278
	protocols = []protocol.ID{v1proto}
	serverProtocols = []protocol.ID{v1proto}
Adin Schmahmann's avatar
Adin Schmahmann committed
279

280
	dht := &IpfsDHT{
281 282
		datastore:              cfg.datastore,
		self:                   h.ID(),
283
		selfKey:                kb.ConvertPeerID(h.ID()),
284 285 286 287
		peerstore:              h.Peerstore(),
		host:                   h,
		birth:                  time.Now(),
		protocols:              protocols,
288
		protocolsStrs:          protocol.ConvertToStrings(protocols),
289 290 291 292 293 294
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
295 296 297
		rtPeerDiversityFilter:  cfg.routingTable.diversityFilter,

		fixLowPeersChan: make(chan struct{}, 1),
Aarsh Shah's avatar
Aarsh Shah committed
298 299 300

		addPeerToRTChan:   make(chan addPeerRTReq),
		refreshFinishedCh: make(chan struct{}),
Jeromy's avatar
Jeromy committed
301
	}
302

303 304 305 306 307 308 309 310 311 312 313 314 315 316
	var maxLastSuccessfulOutboundThreshold time.Duration

	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	if cfg.concurrency < cfg.bucketSize { // (alpha < K)
		l1 := math.Log(float64(1) / float64(cfg.bucketSize))                              //(Log(1/K))
		l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
		maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
	} else {
		maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
	}

Aarsh Shah's avatar
Aarsh Shah committed
317
	// construct routing table
318 319
	// use twice the theoritical usefulness threhold to keep older peers around longer
	rt, err := makeRoutingTable(dht, cfg, 2*maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
320 321 322 323
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt
324
	dht.bootstrapPeers = cfg.bootstrapPeers
Aarsh Shah's avatar
Aarsh Shah committed
325

326 327 328 329 330 331 332
	// rt refresh manager
	rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
	if err != nil {
		return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
	}
	dht.rtRefreshManager = rtRefresh

333
	// create a DHT proc with the given context
334 335 336
	dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
		return rtRefresh.Close()
	})
Aarsh Shah's avatar
Aarsh Shah committed
337 338 339 340 341 342

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
343 344 345 346 347
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
348

Aarsh Shah's avatar
Aarsh Shah committed
349 350
	dht.rtFreezeTimeout = rtFreezeTimeout

Aarsh Shah's avatar
Aarsh Shah committed
351
	return dht, nil
Jeromy's avatar
Jeromy committed
352 353
}

354 355 356 357 358
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
	keyGenFnc := func(cpl uint) (string, error) {
		p, err := dht.routingTable.GenRandPeerID(cpl)
		return string(p), err
	}
Aarsh Shah's avatar
Aarsh Shah committed
359

360 361 362 363 364 365 366 367 368 369 370
	queryFnc := func(ctx context.Context, key string) error {
		_, err := dht.GetClosestPeers(ctx, key)
		return err
	}

	r, err := rtrefresh.NewRtRefreshManager(
		dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
		keyGenFnc,
		queryFnc,
		cfg.routingTable.refreshQueryTimeout,
		cfg.routingTable.refreshInterval,
Aarsh Shah's avatar
Aarsh Shah committed
371 372
		maxLastSuccessfulOutboundThreshold,
		dht.refreshFinishedCh)
373 374 375 376 377

	return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
	// make a Routing Table Diversity Filter
	var filter *peerdiversity.Filter
	if dht.rtPeerDiversityFilter != nil {
		df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int {
			return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
		})

		if err != nil {
			return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err)
		}

		filter = df
	}

	rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
393 394 395 396
	if err != nil {
		return nil, err
	}

Aarsh Shah's avatar
Aarsh Shah committed
397
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
398 399

	rt.PeerAdded = func(p peer.ID) {
400
		commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
401 402 403 404 405
		if commonPrefixLen < protectedBuckets {
			cmgr.Protect(p, kbucketTag)
		} else {
			cmgr.TagPeer(p, kbucketTag, baseConnMgrScore)
		}
Aarsh Shah's avatar
Aarsh Shah committed
406 407
	}
	rt.PeerRemoved = func(p peer.ID) {
408
		cmgr.Unprotect(p, kbucketTag)
409
		cmgr.UntagPeer(p, kbucketTag)
410 411 412

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
413 414 415 416
	}

	return rt, err
}
417

418
// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
419 420
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
	return dht.routingTable.GetDiversityStats()
421 422
}

Will Scott's avatar
Will Scott committed
423 424 425 426 427
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

Sam's avatar
Sam committed
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
	if !dht.disableFixLowPeers {
		dht.fixLowPeers(dht.ctx)
	}

	if err := dht.rtRefreshManager.Start(); err != nil {
		logger.Error(err)
	}

	// listens to the fix low peers chan and tries to fix the Routing Table
	if !dht.disableFixLowPeers {
		dht.proc.Go(dht.fixLowPeersRoutine)
	}

}

444
// fixLowPeersRouting manages simultaneous requests to fixLowPeers
445
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
446 447
	ticker := time.NewTicker(periodicBootstrapInterval)
	defer ticker.Stop()
448

449 450 451
	for {
		select {
		case <-dht.fixLowPeersChan:
452
		case <-ticker.C:
453 454 455
		case <-proc.Closing():
			return
		}
456

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
		dht.fixLowPeers(dht.Context())
	}

}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
	if dht.routingTable.Size() > minRTRefreshThreshold {
		return
	}

	// we try to add all peers we are connected to to the Routing Table
	// in case they aren't already there.
	for _, p := range dht.host.Network().Peers() {
		dht.peerFound(ctx, p, false)
	}
473

474 475 476 477 478 479 480 481
	// TODO Active Bootstrapping
	// We should first use non-bootstrap peers we knew of from previous
	// snapshots of the Routing Table before we connect to the bootstrappers.
	// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
	if dht.routingTable.Size() == 0 {
		if len(dht.bootstrapPeers) == 0 {
			// No point in continuing, we have no peers!
			return
482 483
		}

484 485 486 487 488 489 490 491
		found := 0
		for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
			ai := dht.bootstrapPeers[i]
			err := dht.Host().Connect(ctx, ai)
			if err == nil {
				found++
			} else {
				logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
492 493
			}

494 495 496 497 498 499 500 501 502 503 504
			// Wait for two bootstrap peers, or try them all.
			//
			// Why two? In theory, one should be enough
			// normally. However, if the network were to
			// restart and everyone connected to just one
			// bootstrapper, we'll end up with a mostly
			// partitioned network.
			//
			// So we always bootstrap with two random peers.
			if found == maxNBoostrappers {
				break
505 506
			}
		}
507
	}
508

509 510 511 512
	// if we still don't have peers in our routing table(probably because Identify hasn't completed),
	// there is no point in triggering a Refresh.
	if dht.routingTable.Size() == 0 {
		return
513 514
	}

515 516 517
	if dht.autoRefresh {
		dht.rtRefreshManager.RefreshNoWait()
	}
518 519
}

Aarsh Shah's avatar
Aarsh Shah committed
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

539
// getLocal attempts to retrieve the value from the datastore
540
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
541
	logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key))
Steven Allen's avatar
Steven Allen committed
542

543
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
544
	if err != nil {
545
		logger.Warnw("get local failed", "key", internal.LoggableRecordKeyString(key), "error", err)
546 547
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
548

549
	// Double check the key. Can't hurt.
550
	if rec != nil && string(rec.GetKey()) != key {
551
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", internal.LoggableRecordKeyString(key), "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
552
		return nil, nil
553 554

	}
555
	return rec, nil
556 557
}

558
// putLocal stores the key value pair in the datastore
559
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
560 561
	data, err := proto.Marshal(rec)
	if err != nil {
562
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key))
563 564 565
		return err
	}

566
	return dht.datastore.Put(mkDsKey(key), data)
567
}
568

Aarsh Shah's avatar
Aarsh Shah committed
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
	bootstrapCount := 0
	isBootsrapping := false
	var timerCh <-chan time.Time

	for {
		select {
		case <-timerCh:
			dht.routingTable.MarkAllPeersIrreplaceable()
		case addReq := <-dht.addPeerToRTChan:
			prevSize := dht.routingTable.Size()
			if prevSize == 0 {
				isBootsrapping = true
				bootstrapCount = 0
				timerCh = nil
			}
			newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
			if err != nil {
				// peer not added.
				continue
			}
			if !newlyAdded && addReq.queryPeer {
				// the peer is already in our RT, but we just successfully queried it and so let's give it a
				// bump on the query time so we don't ping it too soon for a liveliness check.
				dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
			}
		case <-dht.refreshFinishedCh:
			bootstrapCount = bootstrapCount + 1
			if bootstrapCount == 2 {
				timerCh = time.NewTimer(dht.rtFreezeTimeout).C
			}

			old := isBootsrapping
			isBootsrapping = false
			if old {
				dht.rtRefreshManager.RefreshNoWait()
			}

		case <-proc.Closing():
			return
		}
	}
}

Aarsh Shah's avatar
Aarsh Shah committed
613
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
614
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
615 616
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
617 618
//    LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
Aarsh Shah's avatar
Aarsh Shah committed
619 620 621 622 623 624 625
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
626
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
627 628 629
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
630 631
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
632
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
633
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
634 635 636
		select {
		case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
		case <-dht.ctx.Done():
Steven Allen's avatar
Steven Allen committed
637 638
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
639
	}
Aarsh Shah's avatar
Aarsh Shah committed
640 641
}

Aarsh Shah's avatar
Aarsh Shah committed
642
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
643
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
644
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
645 646
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
647
	dht.routingTable.RemovePeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
648 649
}

650 651 652 653
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
654
	}
655
}
Jeromy's avatar
Jeromy committed
656

Jeromy's avatar
Jeromy committed
657
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
658
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
659
	switch dht.host.Network().Connectedness(id) {
660
	case network.Connected, network.CanConnect:
661 662
		return dht.peerstore.PeerInfo(id)
	default:
663
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
664 665
	}
}
666

667
// nearestPeersToQuery returns the routing tables closest peers.
668
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
669
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
670 671 672
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
673
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
674
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
675
	closer := dht.nearestPeersToQuery(pmes, count)
676 677 678

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
679
		logger.Infow("no closer peers to send", from)
680 681 682
		return nil
	}

Steven Allen's avatar
Steven Allen committed
683
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
684 685 686
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
687
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
688
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
689 690
			return nil
		}
691
		// Dont send a peer back themselves
692
		if clp == from {
693 694 695
			continue
		}

Jeromy's avatar
Jeromy committed
696
		filtered = append(filtered, clp)
697 698
	}

699 700
	// ok seems like closer nodes
	return filtered
701 702
}

703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
721 722 723
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
724 725
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
726
	for _, p := range dht.serverProtocols {
727 728 729 730 731
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
732 733 734 735 736
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
737 738
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
739
	for _, p := range dht.serverProtocols {
740 741 742 743
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
744
	for _, p := range dht.serverProtocols {
745 746 747 748 749 750 751
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
752
					_ = s.Reset()
753 754 755 756 757 758 759 760 761 762 763 764 765
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
766
// Context returns the DHT's context.
767 768 769 770
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
771
// Process returns the DHT's process.
772 773 774 775
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
776
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
777 778 779 780
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
781
// Close calls Process Close.
782 783 784
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
785 786 787 788

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
789

Alan Shaw's avatar
Alan Shaw committed
790
// PeerID returns the DHT node's Peer ID.
791 792 793 794
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
795
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
796 797 798 799
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
800
// Host returns the libp2p host this DHT is operating with.
801 802 803 804
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
805
// Ping sends a ping message to the passed peer and waits for a response.
806
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
807
	return dht.protoMessenger.Ping(ctx, p)
808
}
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
825 826 827 828 829 830 831 832

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}