dht.go 24.2 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9
	"math/rand"
10 11
	"sync"
	"time"
12

13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
19

20
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
21
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
22
	"github.com/libp2p/go-libp2p-kad-dht/providers"
23
	"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Aarsh Shah's avatar
Aarsh Shah committed
24 25 26
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
27

Aarsh Shah's avatar
Aarsh Shah committed
28
	"github.com/gogo/protobuf/proto"
29 30
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
31
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
32
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
33
	"github.com/multiformats/go-base32"
34
	ma "github.com/multiformats/go-multiaddr"
Adin Schmahmann's avatar
Adin Schmahmann committed
35
	"github.com/multiformats/go-multihash"
36 37
	"go.opencensus.io/tag"
	"go.uber.org/zap"
38 39
)

40 41 42 43
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
)
44

Alan Shaw's avatar
Alan Shaw committed
45 46
// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
Henrique Dias's avatar
Henrique Dias committed
47 48
const BaseConnMgrScore = 5

49 50 51
type mode int

const (
Steven Allen's avatar
Steven Allen committed
52 53
	modeServer mode = iota + 1
	modeClient
54 55
)

Adin Schmahmann's avatar
Adin Schmahmann committed
56 57 58 59 60
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

61 62 63 64 65
const (
	dhtUsefulTag = "dht-useful"
	kbucketTag   = "kbucket"
)

66
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
67
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
type IpfsDHT struct {
69 70 71
	host      host.Host // the network services we need
	self      peer.ID   // Local peer (yourself)
	selfKey   kb.ID
72
	peerstore peerstore.Peerstore // Peer Registry
73

74
	datastore ds.Datastore // Local data
75

76
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
77 78
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
79

80 81 82
	// manages Routing Table refresh
	rtRefreshManager *rtrefresh.RtRefreshManager

Aarsh Shah's avatar
Aarsh Shah committed
83
	birth time.Time // When this peer started up
84

85
	Validator record.Validator
86

87 88
	ctx  context.Context
	proc goprocess.Process
89 90 91

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
92

Steven Allen's avatar
Steven Allen committed
93
	plk sync.Mutex
94

95 96
	stripedPutLocks [256]sync.Mutex

97 98
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
99 100
	protocols     []protocol.ID
	protocolsStrs []string
Adin Schmahmann's avatar
Adin Schmahmann committed
101

102
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
103
	serverProtocols []protocol.ID
104

105
	auto   ModeOpt
106 107 108
	mode   mode
	modeLk sync.Mutex

109
	bucketSize int
110
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
111
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
112

113 114 115
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

116
	autoRefresh bool
Aarsh Shah's avatar
Aarsh Shah committed
117

118 119 120 121 122
	// A set of bootstrap peers to fallback on if all other attempts to fix
	// the routing table fail (or, e.g., this is the first time this node is
	// connecting to the network).
	bootstrapPeers []peer.AddrInfo

Aarsh Shah's avatar
Aarsh Shah committed
123
	maxRecordAge time.Duration
124

125 126 127
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
128
	enableProviders, enableValues bool
129 130

	fixLowPeersChan chan struct{}
131 132
}

Matt Joiner's avatar
Matt Joiner committed
133 134 135 136
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
137
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
138 139 140 141 142
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

143
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
144 145 146
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
147 148
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
149
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
150 151
		return nil, err
	}
152 153 154
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
155

Adin Schmahmann's avatar
Adin Schmahmann committed
156 157
	if err := cfg.validate(); err != nil {
		return nil, err
158
	}
159

Aarsh Shah's avatar
Aarsh Shah committed
160 161 162 163
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
164

165
	dht.autoRefresh = cfg.routingTable.autoRefresh
166

167 168 169
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
170

171
	dht.Validator = cfg.validator
172

173
	dht.auto = cfg.mode
174
	switch cfg.mode {
175
	case ModeAuto, ModeClient:
176
		dht.mode = modeClient
177
	case ModeAutoServer, ModeServer:
178 179
		dht.mode = modeServer
	default:
180
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
181 182 183 184 185
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
186
		}
187
	}
188 189 190 191 192 193 194 195 196 197

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

198 199 200
	if err := dht.rtRefreshManager.Start(); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
201 202 203 204

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
205 206 207 208

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

209 210
	return dht, nil
}
211

212 213 214 215
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
216
	dht, err := New(ctx, h, Datastore(dstore))
217 218 219 220 221 222 223 224 225 226 227
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
228
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
229 230 231
	if err != nil {
		panic(err)
	}
232 233 234
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
235
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
236
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
237 238

	// check if custom test protocols were set
239
	if cfg.v1CompatibleMode {
240 241 242 243 244 245 246 247 248 249 250 251 252 253
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
254 255
	}

256
	dht := &IpfsDHT{
257 258
		datastore:              cfg.datastore,
		self:                   h.ID(),
259
		selfKey:                kb.ConvertPeerID(h.ID()),
260 261 262 263 264
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
265
		protocolsStrs:          protocol.ConvertToStrings(protocols),
266 267 268 269 270 271
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
272
		fixLowPeersChan:        make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
273
	}
274

275 276 277 278 279 280 281 282 283 284 285 286 287 288
	var maxLastSuccessfulOutboundThreshold time.Duration

	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	if cfg.concurrency < cfg.bucketSize { // (alpha < K)
		l1 := math.Log(float64(1) / float64(cfg.bucketSize))                              //(Log(1/K))
		l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
		maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
	} else {
		maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
	}

Aarsh Shah's avatar
Aarsh Shah committed
289
	// construct routing table
290
	rt, err := makeRoutingTable(dht, cfg, maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
291 292 293 294
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt
295
	dht.bootstrapPeers = cfg.bootstrapPeers
Aarsh Shah's avatar
Aarsh Shah committed
296

297 298 299 300 301 302 303
	// rt refresh manager
	rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
	if err != nil {
		return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
	}
	dht.rtRefreshManager = rtRefresh

304
	// create a DHT proc with the given context
305 306 307
	dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
		return rtRefresh.Close()
	})
Aarsh Shah's avatar
Aarsh Shah committed
308 309 310 311 312 313

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
314 315 316 317 318
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
319

Aarsh Shah's avatar
Aarsh Shah committed
320
	return dht, nil
Jeromy's avatar
Jeromy committed
321 322
}

323 324 325 326 327
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
	keyGenFnc := func(cpl uint) (string, error) {
		p, err := dht.routingTable.GenRandPeerID(cpl)
		return string(p), err
	}
Aarsh Shah's avatar
Aarsh Shah committed
328

329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
	queryFnc := func(ctx context.Context, key string) error {
		_, err := dht.GetClosestPeers(ctx, key)
		return err
	}

	r, err := rtrefresh.NewRtRefreshManager(
		dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
		keyGenFnc,
		queryFnc,
		cfg.routingTable.refreshQueryTimeout,
		cfg.routingTable.refreshInterval,
		maxLastSuccessfulOutboundThreshold)

	return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
346
	rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
347
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
348 349

	rt.PeerAdded = func(p peer.ID) {
350 351
		commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, kbucketTag, BaseConnMgrScore+commonPrefixLen)
Aarsh Shah's avatar
Aarsh Shah committed
352 353
	}
	rt.PeerRemoved = func(p peer.ID) {
354 355
		cmgr.Unprotect(p, dhtUsefulTag)
		cmgr.UntagPeer(p, kbucketTag)
356 357 358

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
359 360 361 362
	}

	return rt, err
}
363

Will Scott's avatar
Will Scott committed
364 365 366 367 368
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

369
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
370
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
371 372 373
	timer := time.NewTimer(periodicBootstrapInterval)
	defer timer.Stop()

374 375 376
	for {
		select {
		case <-dht.fixLowPeersChan:
377
		case <-timer.C:
378 379 380
		case <-proc.Closing():
			return
		}
381

382 383 384 385
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

386 387
		// we try to add all peers we are connected to to the Routing Table
		// in case they aren't already there.
388 389 390 391
		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
		// TODO Active Bootstrapping
		// We should first use non-bootstrap peers we knew of from previous
		// snapshots of the Routing Table before we connect to the bootstrappers.
		// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
		if dht.routingTable.Size() == 0 {
			if len(dht.bootstrapPeers) == 0 {
				// No point in continuing, we have no peers!
				continue
			}

			found := 0
			for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
				ai := dht.bootstrapPeers[i]
				err := dht.Host().Connect(dht.Context(), ai)
				if err == nil {
					found++
				} else {
					logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
				}

				// Wait for two bootstrap peers, or try them all.
				//
				// Why two? In theory, one should be enough
				// normally. However, if the network were to
				// restart and everyone connected to just one
				// bootstrapper, we'll end up with a mostly
				// partitioned network.
				//
				// So we always bootstrap with two random peers.
				if found == maxNBoostrappers {
					break
				}
			}
		}

		// if we still don't have peers in our routing table(probably because Identify hasn't completed),
		// there is no point in triggering a Refresh.
		if dht.routingTable.Size() == 0 {
			continue
		}

433
		if dht.autoRefresh {
434
			dht.rtRefreshManager.RefreshNoWait()
435 436 437 438 439
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
459
// putValueToPeer stores the given key/value pair at the peer 'p'
460 461
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
462
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
463
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
464
	if err != nil {
Steven Allen's avatar
Steven Allen committed
465
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466 467
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
468

469
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
470
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
471 472
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
473

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
474
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
475 476
}

477 478
var errInvalidRecord = errors.New("received invalid record")

479 480
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
481
// NOTE: It will update the dht's peerstore with any new addresses
482
// it finds for the given peer.
483
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
484
	pmes, err := dht.getValueSingle(ctx, p, key)
485
	if err != nil {
486
		return nil, nil, err
487 488
	}

489 490 491
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

492
	if record := pmes.GetRecord(); record != nil {
493
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
494
		logger.Debug("got value")
495

496
		// make sure record is valid.
497
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
498
		if err != nil {
Steven Allen's avatar
Steven Allen committed
499
			logger.Debug("received invalid record (discarded)")
500 501
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
502
			record = new(recpb.Record)
503
		}
504
		return record, peers, err
505
	}
506

507 508 509 510
	if len(peers) > 0 {
		return nil, peers, nil
	}

511
	return nil, nil, routing.ErrNotFound
512 513
}

514
// getValueSingle simply performs the get value RPC with the given parameters
515
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
516
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
517
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
518 519
}

520
// getLocal attempts to retrieve the value from the datastore
521
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
522 523
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

524
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
525
	if err != nil {
Steven Allen's avatar
Steven Allen committed
526
		logger.Warnw("get local failed", "key", key, "error", err)
527 528
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
529

530
	// Double check the key. Can't hurt.
531
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
532
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
533
		return nil, nil
534 535

	}
536
	return rec, nil
537 538
}

539
// putLocal stores the key value pair in the datastore
540
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
541 542
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
543
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
544 545 546
		return err
	}

547
	return dht.datastore.Put(mkDsKey(key), data)
548
}
549

Aarsh Shah's avatar
Aarsh Shah committed
550
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
551
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
552 553
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
554 555
//    LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
Aarsh Shah's avatar
Aarsh Shah committed
556 557 558 559 560 561 562
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
563
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
564 565 566
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
567 568
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
569
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
570
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
571
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
572 573 574 575
		if err != nil {
			// peer not added.
			return
		}
576
		if !newlyAdded && queryPeer {
Aarsh Shah's avatar
Aarsh Shah committed
577 578 579
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
580 581
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
582 583
}

Aarsh Shah's avatar
Aarsh Shah committed
584
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
585
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
586
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
587 588
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
589
	dht.routingTable.RemovePeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
590 591
}

592 593 594 595
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
596
	}
597
}
Jeromy's avatar
Jeromy committed
598

Jeromy's avatar
Jeromy committed
599
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
600
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
601
	switch dht.host.Network().Connectedness(id) {
602
	case network.Connected, network.CanConnect:
603 604
		return dht.peerstore.PeerInfo(id)
	default:
605
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
606 607
	}
}
608

Jeromy's avatar
Jeromy committed
609
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
610
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
611
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
612
	return dht.sendRequest(ctx, p, pmes)
613
}
614

Adin Schmahmann's avatar
Adin Schmahmann committed
615 616
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
617
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
618 619
}

620
// nearestPeersToQuery returns the routing tables closest peers.
621
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
622
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
623 624 625
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
626
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
627
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
628
	closer := dht.nearestPeersToQuery(pmes, count)
629 630 631

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
632
		logger.Infow("no closer peers to send", from)
633 634 635
		return nil
	}

Steven Allen's avatar
Steven Allen committed
636
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
637 638 639
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
640
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
641
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
642 643
			return nil
		}
644
		// Dont send a peer back themselves
645
		if clp == from {
646 647 648
			continue
		}

Jeromy's avatar
Jeromy committed
649
		filtered = append(filtered, clp)
650 651
	}

652 653
	// ok seems like closer nodes
	return filtered
654 655
}

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
674 675 676
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
677 678
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
679
	for _, p := range dht.serverProtocols {
680 681 682 683 684
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
685 686 687 688 689
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
690 691
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
692
	for _, p := range dht.serverProtocols {
693 694 695 696
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
697
	for _, p := range dht.serverProtocols {
698 699 700 701 702 703 704
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
705
					_ = s.Reset()
706 707 708 709 710 711 712 713 714 715 716 717 718
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
719
// Context returns the DHT's context.
720 721 722 723
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
724
// Process returns the DHT's process.
725 726 727 728
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
729
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
730 731 732 733
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
734
// Close calls Process Close.
735 736 737
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
738 739 740 741

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
742

Alan Shaw's avatar
Alan Shaw committed
743
// PeerID returns the DHT node's Peer ID.
744 745 746 747
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
748
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
749 750 751 752
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
753
// Host returns the libp2p host this DHT is operating with.
754 755 756 757
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
758
// Ping sends a ping message to the passed peer and waits for a response.
759 760 761 762
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
763
		return fmt.Errorf("sending request: %w", err)
764 765
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
766
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
767 768 769
	}
	return nil
}
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
786 787 788 789 790 791 792 793

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}