dht.go 25.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9
	"math/rand"
10 11
	"sync"
	"time"
12

13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
19

20
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
21
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
22
	"github.com/libp2p/go-libp2p-kad-dht/providers"
23
	"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Aarsh Shah's avatar
Aarsh Shah committed
24
	kb "github.com/libp2p/go-libp2p-kbucket"
25
	"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
Aarsh Shah's avatar
Aarsh Shah committed
26 27
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
28

Aarsh Shah's avatar
Aarsh Shah committed
29
	"github.com/gogo/protobuf/proto"
30 31
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
32
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
33
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
34
	"github.com/multiformats/go-base32"
35
	ma "github.com/multiformats/go-multiaddr"
Adin Schmahmann's avatar
Adin Schmahmann committed
36
	"github.com/multiformats/go-multihash"
37 38
	"go.opencensus.io/tag"
	"go.uber.org/zap"
39 40
)

41 42 43 44
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
)
45

Steven Allen's avatar
Steven Allen committed
46 47 48 49 50 51
const (
	// BaseConnMgrScore is the base of the score set on the connection
	// manager "kbucket" tag. It is added with the common prefix length
	// between two peer IDs.
	baseConnMgrScore = 5
)
Henrique Dias's avatar
Henrique Dias committed
52

53 54 55
type mode int

const (
Steven Allen's avatar
Steven Allen committed
56 57
	modeServer mode = iota + 1
	modeClient
58 59
)

Adin Schmahmann's avatar
Adin Schmahmann committed
60 61 62 63 64
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

65
const (
66 67
	kbucketTag       = "kbucket"
	protectedBuckets = 2
68 69
)

70
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
71
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
type IpfsDHT struct {
73 74 75
	host      host.Host // the network services we need
	self      peer.ID   // Local peer (yourself)
	selfKey   kb.ID
76
	peerstore peerstore.Peerstore // Peer Registry
77

78
	datastore ds.Datastore // Local data
79

80
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
81 82
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
83

84 85 86
	// manages Routing Table refresh
	rtRefreshManager *rtrefresh.RtRefreshManager

Aarsh Shah's avatar
Aarsh Shah committed
87
	birth time.Time // When this peer started up
88

89
	Validator record.Validator
90

91 92
	ctx  context.Context
	proc goprocess.Process
93 94 95

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
96

Steven Allen's avatar
Steven Allen committed
97
	plk sync.Mutex
98

99 100
	stripedPutLocks [256]sync.Mutex

101 102
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
103 104
	protocols     []protocol.ID
	protocolsStrs []string
Adin Schmahmann's avatar
Adin Schmahmann committed
105

106
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
107
	serverProtocols []protocol.ID
108

109
	auto   ModeOpt
110 111 112
	mode   mode
	modeLk sync.Mutex

113
	bucketSize int
114
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
115
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
116

117 118
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc
119
	rtPeerDiversityFilter  peerdiversity.PeerIPGroupFilter
120

121
	autoRefresh bool
Aarsh Shah's avatar
Aarsh Shah committed
122

123 124 125 126 127
	// A set of bootstrap peers to fallback on if all other attempts to fix
	// the routing table fail (or, e.g., this is the first time this node is
	// connecting to the network).
	bootstrapPeers []peer.AddrInfo

Aarsh Shah's avatar
Aarsh Shah committed
128
	maxRecordAge time.Duration
129

130 131 132
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
133
	enableProviders, enableValues bool
134 135

	fixLowPeersChan chan struct{}
136 137
}

Matt Joiner's avatar
Matt Joiner committed
138 139 140 141
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
142
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
143 144 145 146 147
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

148
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
149 150 151
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
152 153
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
154
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
155 156
		return nil, err
	}
157 158 159
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
160

Adin Schmahmann's avatar
Adin Schmahmann committed
161 162
	if err := cfg.validate(); err != nil {
		return nil, err
163
	}
164

Aarsh Shah's avatar
Aarsh Shah committed
165 166 167 168
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
169

170
	dht.autoRefresh = cfg.routingTable.autoRefresh
171

172 173 174
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
175

176
	dht.Validator = cfg.validator
177

178
	dht.auto = cfg.mode
179
	switch cfg.mode {
180
	case ModeAuto, ModeClient:
181
		dht.mode = modeClient
182
	case ModeAutoServer, ModeServer:
183 184
		dht.mode = modeServer
	default:
185
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
186 187 188 189 190
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
191
		}
192
	}
193 194 195 196 197 198 199 200 201 202

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

203 204 205
	if err := dht.rtRefreshManager.Start(); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
206 207 208 209

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
210 211 212 213

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

214 215
	return dht, nil
}
216

217 218 219 220
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
221
	dht, err := New(ctx, h, Datastore(dstore))
222 223 224 225 226 227 228 229 230 231 232
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
233
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
234 235 236
	if err != nil {
		panic(err)
	}
237 238 239
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
240
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
241
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
242 243

	// check if custom test protocols were set
244
	if cfg.v1CompatibleMode {
245 246 247 248 249 250 251 252 253 254 255 256 257 258
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
259 260
	}

261
	dht := &IpfsDHT{
262 263
		datastore:              cfg.datastore,
		self:                   h.ID(),
264
		selfKey:                kb.ConvertPeerID(h.ID()),
265 266 267 268 269
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
270
		protocolsStrs:          protocol.ConvertToStrings(protocols),
271 272 273 274 275 276
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
277 278 279
		rtPeerDiversityFilter:  cfg.routingTable.diversityFilter,

		fixLowPeersChan: make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
280
	}
281

282 283 284 285 286 287 288 289 290 291 292 293 294 295
	var maxLastSuccessfulOutboundThreshold time.Duration

	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	if cfg.concurrency < cfg.bucketSize { // (alpha < K)
		l1 := math.Log(float64(1) / float64(cfg.bucketSize))                              //(Log(1/K))
		l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
		maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
	} else {
		maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
	}

Aarsh Shah's avatar
Aarsh Shah committed
296
	// construct routing table
297 298
	// use twice the theoritical usefulness threhold to keep older peers around longer
	rt, err := makeRoutingTable(dht, cfg, 2*maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
299 300 301 302
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt
303
	dht.bootstrapPeers = cfg.bootstrapPeers
Aarsh Shah's avatar
Aarsh Shah committed
304

305 306 307 308 309 310 311
	// rt refresh manager
	rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
	if err != nil {
		return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
	}
	dht.rtRefreshManager = rtRefresh

312
	// create a DHT proc with the given context
313 314 315
	dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
		return rtRefresh.Close()
	})
Aarsh Shah's avatar
Aarsh Shah committed
316 317 318 319 320 321

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
322 323 324 325 326
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
327

Aarsh Shah's avatar
Aarsh Shah committed
328
	return dht, nil
Jeromy's avatar
Jeromy committed
329 330
}

331 332 333 334 335
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
	keyGenFnc := func(cpl uint) (string, error) {
		p, err := dht.routingTable.GenRandPeerID(cpl)
		return string(p), err
	}
Aarsh Shah's avatar
Aarsh Shah committed
336

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
	queryFnc := func(ctx context.Context, key string) error {
		_, err := dht.GetClosestPeers(ctx, key)
		return err
	}

	r, err := rtrefresh.NewRtRefreshManager(
		dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
		keyGenFnc,
		queryFnc,
		cfg.routingTable.refreshQueryTimeout,
		cfg.routingTable.refreshInterval,
		maxLastSuccessfulOutboundThreshold)

	return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
	// make a Routing Table Diversity Filter
	var filter *peerdiversity.Filter
	if dht.rtPeerDiversityFilter != nil {
		df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int {
			return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
		})

		if err != nil {
			return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err)
		}

		filter = df
	}

	rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
Aarsh Shah's avatar
Aarsh Shah committed
369
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
370 371

	rt.PeerAdded = func(p peer.ID) {
372
		commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
373 374 375 376 377
		if commonPrefixLen < protectedBuckets {
			cmgr.Protect(p, kbucketTag)
		} else {
			cmgr.TagPeer(p, kbucketTag, baseConnMgrScore)
		}
Aarsh Shah's avatar
Aarsh Shah committed
378 379
	}
	rt.PeerRemoved = func(p peer.ID) {
380
		cmgr.Unprotect(p, kbucketTag)
381
		cmgr.UntagPeer(p, kbucketTag)
382 383 384

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
385 386 387 388
	}

	return rt, err
}
389

390 391 392 393 394
// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (d *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
	return d.routingTable.GetDiversityStats()
}

Will Scott's avatar
Will Scott committed
395 396 397 398 399
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

400
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
401
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
402 403 404
	timer := time.NewTimer(periodicBootstrapInterval)
	defer timer.Stop()

405 406 407
	for {
		select {
		case <-dht.fixLowPeersChan:
408
		case <-timer.C:
409 410 411
		case <-proc.Closing():
			return
		}
412

413 414 415 416
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

417 418
		// we try to add all peers we are connected to to the Routing Table
		// in case they aren't already there.
419 420 421 422
		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
		// TODO Active Bootstrapping
		// We should first use non-bootstrap peers we knew of from previous
		// snapshots of the Routing Table before we connect to the bootstrappers.
		// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
		if dht.routingTable.Size() == 0 {
			if len(dht.bootstrapPeers) == 0 {
				// No point in continuing, we have no peers!
				continue
			}

			found := 0
			for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
				ai := dht.bootstrapPeers[i]
				err := dht.Host().Connect(dht.Context(), ai)
				if err == nil {
					found++
				} else {
					logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
				}

				// Wait for two bootstrap peers, or try them all.
				//
				// Why two? In theory, one should be enough
				// normally. However, if the network were to
				// restart and everyone connected to just one
				// bootstrapper, we'll end up with a mostly
				// partitioned network.
				//
				// So we always bootstrap with two random peers.
				if found == maxNBoostrappers {
					break
				}
			}
		}

		// if we still don't have peers in our routing table(probably because Identify hasn't completed),
		// there is no point in triggering a Refresh.
		if dht.routingTable.Size() == 0 {
			continue
		}

464
		if dht.autoRefresh {
465
			dht.rtRefreshManager.RefreshNoWait()
466 467 468 469 470
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
490
// putValueToPeer stores the given key/value pair at the peer 'p'
491 492
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
493
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
494
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
495
	if err != nil {
Steven Allen's avatar
Steven Allen committed
496
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497 498
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
499

500
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
501
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502 503
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
504

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
505
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
506 507
}

508 509
var errInvalidRecord = errors.New("received invalid record")

510 511
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
512
// NOTE: It will update the dht's peerstore with any new addresses
513
// it finds for the given peer.
514
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
515
	pmes, err := dht.getValueSingle(ctx, p, key)
516
	if err != nil {
517
		return nil, nil, err
518 519
	}

520 521 522
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

523
	if record := pmes.GetRecord(); record != nil {
524
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
525
		logger.Debug("got value")
526

527
		// make sure record is valid.
528
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
529
		if err != nil {
Steven Allen's avatar
Steven Allen committed
530
			logger.Debug("received invalid record (discarded)")
531 532
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
533
			record = new(recpb.Record)
534
		}
535
		return record, peers, err
536
	}
537

538 539 540 541
	if len(peers) > 0 {
		return nil, peers, nil
	}

542
	return nil, nil, routing.ErrNotFound
543 544
}

545
// getValueSingle simply performs the get value RPC with the given parameters
546
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
547
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
548
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
549 550
}

551
// getLocal attempts to retrieve the value from the datastore
552
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
553 554
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

555
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
556
	if err != nil {
Steven Allen's avatar
Steven Allen committed
557
		logger.Warnw("get local failed", "key", key, "error", err)
558 559
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
560

561
	// Double check the key. Can't hurt.
562
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
563
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
564
		return nil, nil
565 566

	}
567
	return rec, nil
568 569
}

570
// putLocal stores the key value pair in the datastore
571
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
572 573
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
574
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
575 576 577
		return err
	}

578
	return dht.datastore.Put(mkDsKey(key), data)
579
}
580

Aarsh Shah's avatar
Aarsh Shah committed
581
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
582
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
583 584
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
585 586
//    LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
Aarsh Shah's avatar
Aarsh Shah committed
587 588 589 590 591 592 593
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
594
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
595 596 597
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
598 599
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
600
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
601
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
602
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
603 604 605 606
		if err != nil {
			// peer not added.
			return
		}
607
		if !newlyAdded && queryPeer {
Aarsh Shah's avatar
Aarsh Shah committed
608 609 610
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
611 612
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
613 614
}

Aarsh Shah's avatar
Aarsh Shah committed
615
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
616
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
617
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
618 619
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
620
	dht.routingTable.RemovePeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
621 622
}

623 624 625 626
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
627
	}
628
}
Jeromy's avatar
Jeromy committed
629

Jeromy's avatar
Jeromy committed
630
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
631
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
632
	switch dht.host.Network().Connectedness(id) {
633
	case network.Connected, network.CanConnect:
634 635
		return dht.peerstore.PeerInfo(id)
	default:
636
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
637 638
	}
}
639

Jeromy's avatar
Jeromy committed
640
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
641
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
642
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
643
	return dht.sendRequest(ctx, p, pmes)
644
}
645

Adin Schmahmann's avatar
Adin Schmahmann committed
646 647
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
648
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
649 650
}

651
// nearestPeersToQuery returns the routing tables closest peers.
652
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
653
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
654 655 656
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
657
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
658
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
659
	closer := dht.nearestPeersToQuery(pmes, count)
660 661 662

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
663
		logger.Infow("no closer peers to send", from)
664 665 666
		return nil
	}

Steven Allen's avatar
Steven Allen committed
667
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
668 669 670
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
671
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
672
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
673 674
			return nil
		}
675
		// Dont send a peer back themselves
676
		if clp == from {
677 678 679
			continue
		}

Jeromy's avatar
Jeromy committed
680
		filtered = append(filtered, clp)
681 682
	}

683 684
	// ok seems like closer nodes
	return filtered
685 686
}

687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
705 706 707
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
708 709
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
710
	for _, p := range dht.serverProtocols {
711 712 713 714 715
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
716 717 718 719 720
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
721 722
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
723
	for _, p := range dht.serverProtocols {
724 725 726 727
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
728
	for _, p := range dht.serverProtocols {
729 730 731 732 733 734 735
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
736
					_ = s.Reset()
737 738 739 740 741 742 743 744 745 746 747 748 749
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
750
// Context returns the DHT's context.
751 752 753 754
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
755
// Process returns the DHT's process.
756 757 758 759
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
760
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
761 762 763 764
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
765
// Close calls Process Close.
766 767 768
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
769 770 771 772

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
773

Alan Shaw's avatar
Alan Shaw committed
774
// PeerID returns the DHT node's Peer ID.
775 776 777 778
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
779
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
780 781 782 783
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
784
// Host returns the libp2p host this DHT is operating with.
785 786 787 788
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
789
// Ping sends a ping message to the passed peer and waits for a response.
790 791 792 793
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
794
		return fmt.Errorf("sending request: %w", err)
795 796
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
797
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
798 799 800
	}
	return nil
}
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
817 818 819 820 821 822 823 824

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}