dht.go 24.2 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9
	"math/rand"
10 11
	"sync"
	"time"
12

13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
19

20
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
21
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
22
	"github.com/libp2p/go-libp2p-kad-dht/providers"
23
	"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Aarsh Shah's avatar
Aarsh Shah committed
24 25 26
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
27

Aarsh Shah's avatar
Aarsh Shah committed
28
	"github.com/gogo/protobuf/proto"
29 30
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
31
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
32
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
33
	"github.com/multiformats/go-base32"
34
	ma "github.com/multiformats/go-multiaddr"
Adin Schmahmann's avatar
Adin Schmahmann committed
35
	"github.com/multiformats/go-multihash"
36 37
	"go.opencensus.io/tag"
	"go.uber.org/zap"
38 39
)

40 41 42 43
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
)
44

Alan Shaw's avatar
Alan Shaw committed
45 46
// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
Henrique Dias's avatar
Henrique Dias committed
47 48
const BaseConnMgrScore = 5

49 50 51
type mode int

const (
Steven Allen's avatar
Steven Allen committed
52 53
	modeServer mode = iota + 1
	modeClient
54 55
)

Adin Schmahmann's avatar
Adin Schmahmann committed
56 57 58 59 60
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

61
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
62
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
type IpfsDHT struct {
64 65 66
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
67

68
	datastore ds.Datastore // Local data
69

70
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
71 72
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
73

74 75 76
	// manages Routing Table refresh
	rtRefreshManager *rtrefresh.RtRefreshManager

Aarsh Shah's avatar
Aarsh Shah committed
77
	birth time.Time // When this peer started up
78

79
	Validator record.Validator
80

81 82
	ctx  context.Context
	proc goprocess.Process
83 84 85

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
86

Steven Allen's avatar
Steven Allen committed
87
	plk sync.Mutex
88

89 90
	stripedPutLocks [256]sync.Mutex

91 92
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
93 94
	protocols     []protocol.ID
	protocolsStrs []string
Adin Schmahmann's avatar
Adin Schmahmann committed
95

96
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
97
	serverProtocols []protocol.ID
98

99
	auto   ModeOpt
100 101 102
	mode   mode
	modeLk sync.Mutex

103
	bucketSize int
104
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
105
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
106

107 108 109
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

110
	autoRefresh bool
Aarsh Shah's avatar
Aarsh Shah committed
111

112 113 114 115 116
	// A set of bootstrap peers to fallback on if all other attempts to fix
	// the routing table fail (or, e.g., this is the first time this node is
	// connecting to the network).
	bootstrapPeers []peer.AddrInfo

Aarsh Shah's avatar
Aarsh Shah committed
117
	maxRecordAge time.Duration
118

119 120 121
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
122
	enableProviders, enableValues bool
123 124

	fixLowPeersChan chan struct{}
125 126
}

Matt Joiner's avatar
Matt Joiner committed
127 128 129 130
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
131
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
132 133 134 135 136
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

137
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
138 139 140
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
141 142
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
143
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
144 145
		return nil, err
	}
146 147 148
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
149

Adin Schmahmann's avatar
Adin Schmahmann committed
150 151
	if err := cfg.validate(); err != nil {
		return nil, err
152
	}
153

Aarsh Shah's avatar
Aarsh Shah committed
154 155 156 157
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
158

159
	dht.autoRefresh = cfg.routingTable.autoRefresh
160

161 162 163
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
164

165
	dht.Validator = cfg.validator
166

167
	dht.auto = cfg.mode
168
	switch cfg.mode {
169
	case ModeAuto, ModeClient:
170
		dht.mode = modeClient
171
	case ModeAutoServer, ModeServer:
172 173
		dht.mode = modeServer
	default:
174
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
175 176 177 178 179
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
180
		}
181
	}
182 183 184 185 186 187 188 189 190 191

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

192 193 194
	if err := dht.rtRefreshManager.Start(); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
195 196 197 198

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
199 200 201 202

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

203 204
	return dht, nil
}
205

206 207 208 209
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
210
	dht, err := New(ctx, h, Datastore(dstore))
211 212 213 214 215 216 217 218 219 220 221
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
222
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
223 224 225
	if err != nil {
		panic(err)
	}
226 227 228
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
229
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
230
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
231 232

	// check if custom test protocols were set
233
	if cfg.v1CompatibleMode {
234 235 236 237 238 239 240 241 242 243 244 245 246 247
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
248 249
	}

250
	dht := &IpfsDHT{
251 252 253 254 255 256 257
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
258
		protocolsStrs:          protocol.ConvertToStrings(protocols),
259 260 261 262 263 264
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
265
		fixLowPeersChan:        make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
266
	}
267

268 269 270 271 272 273 274 275 276 277 278 279 280 281
	var maxLastSuccessfulOutboundThreshold time.Duration

	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	if cfg.concurrency < cfg.bucketSize { // (alpha < K)
		l1 := math.Log(float64(1) / float64(cfg.bucketSize))                              //(Log(1/K))
		l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
		maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
	} else {
		maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
	}

Aarsh Shah's avatar
Aarsh Shah committed
282
	// construct routing table
283 284
	// use twice the theoritical usefulness threhold to keep older peers around longer
	rt, err := makeRoutingTable(dht, cfg, 2*maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
285 286 287 288
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt
289
	dht.bootstrapPeers = cfg.bootstrapPeers
Aarsh Shah's avatar
Aarsh Shah committed
290

291 292 293 294 295 296 297
	// rt refresh manager
	rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
	if err != nil {
		return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
	}
	dht.rtRefreshManager = rtRefresh

298
	// create a DHT proc with the given context
299 300 301
	dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
		return rtRefresh.Close()
	})
Aarsh Shah's avatar
Aarsh Shah committed
302 303 304 305 306 307

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
308 309 310 311 312
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
313

Aarsh Shah's avatar
Aarsh Shah committed
314
	return dht, nil
Jeromy's avatar
Jeromy committed
315 316
}

317 318 319 320 321
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
	keyGenFnc := func(cpl uint) (string, error) {
		p, err := dht.routingTable.GenRandPeerID(cpl)
		return string(p), err
	}
Aarsh Shah's avatar
Aarsh Shah committed
322

323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
	queryFnc := func(ctx context.Context, key string) error {
		_, err := dht.GetClosestPeers(ctx, key)
		return err
	}

	r, err := rtrefresh.NewRtRefreshManager(
		dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
		keyGenFnc,
		queryFnc,
		cfg.routingTable.refreshQueryTimeout,
		cfg.routingTable.refreshInterval,
		maxLastSuccessfulOutboundThreshold)

	return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
Aarsh Shah's avatar
Aarsh Shah committed
340
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
341

Aarsh Shah's avatar
Aarsh Shah committed
342
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
343
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
344 345 346 347 348 349 350

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
351 352 353

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
354 355 356 357
	}

	return rt, err
}
358

Will Scott's avatar
Will Scott committed
359 360 361 362 363
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

364
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
365
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
366 367 368
	timer := time.NewTimer(periodicBootstrapInterval)
	defer timer.Stop()

369 370 371
	for {
		select {
		case <-dht.fixLowPeersChan:
372
		case <-timer.C:
373 374 375
		case <-proc.Closing():
			return
		}
376

377 378 379 380
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

381 382
		// we try to add all peers we are connected to to the Routing Table
		// in case they aren't already there.
383 384 385 386
		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
		// TODO Active Bootstrapping
		// We should first use non-bootstrap peers we knew of from previous
		// snapshots of the Routing Table before we connect to the bootstrappers.
		// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
		if dht.routingTable.Size() == 0 {
			if len(dht.bootstrapPeers) == 0 {
				// No point in continuing, we have no peers!
				continue
			}

			found := 0
			for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
				ai := dht.bootstrapPeers[i]
				err := dht.Host().Connect(dht.Context(), ai)
				if err == nil {
					found++
				} else {
					logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
				}

				// Wait for two bootstrap peers, or try them all.
				//
				// Why two? In theory, one should be enough
				// normally. However, if the network were to
				// restart and everyone connected to just one
				// bootstrapper, we'll end up with a mostly
				// partitioned network.
				//
				// So we always bootstrap with two random peers.
				if found == maxNBoostrappers {
					break
				}
			}
		}

		// if we still don't have peers in our routing table(probably because Identify hasn't completed),
		// there is no point in triggering a Refresh.
		if dht.routingTable.Size() == 0 {
			continue
		}

428
		if dht.autoRefresh {
429
			dht.rtRefreshManager.RefreshNoWait()
430 431 432 433 434
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
454
// putValueToPeer stores the given key/value pair at the peer 'p'
455 456
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
457
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
458
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
459
	if err != nil {
Steven Allen's avatar
Steven Allen committed
460
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
461 462
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
463

464
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
465
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466 467
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
468

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
469
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
470 471
}

472 473
var errInvalidRecord = errors.New("received invalid record")

474 475
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
476
// NOTE: It will update the dht's peerstore with any new addresses
477
// it finds for the given peer.
478
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
479
	pmes, err := dht.getValueSingle(ctx, p, key)
480
	if err != nil {
481
		return nil, nil, err
482 483
	}

484 485 486
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

487
	if record := pmes.GetRecord(); record != nil {
488
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
489
		logger.Debug("got value")
490

491
		// make sure record is valid.
492
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
493
		if err != nil {
Steven Allen's avatar
Steven Allen committed
494
			logger.Debug("received invalid record (discarded)")
495 496
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
497
			record = new(recpb.Record)
498
		}
499
		return record, peers, err
500
	}
501

502 503 504 505
	if len(peers) > 0 {
		return nil, peers, nil
	}

506
	return nil, nil, routing.ErrNotFound
507 508
}

509
// getValueSingle simply performs the get value RPC with the given parameters
510
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
511
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
512
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
513 514
}

515
// getLocal attempts to retrieve the value from the datastore
516
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
517 518
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

519
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
520
	if err != nil {
Steven Allen's avatar
Steven Allen committed
521
		logger.Warnw("get local failed", "key", key, "error", err)
522 523
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524

525
	// Double check the key. Can't hurt.
526
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
527
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
528
		return nil, nil
529 530

	}
531
	return rec, nil
532 533
}

534
// putLocal stores the key value pair in the datastore
535
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
536 537
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
538
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
539 540 541
		return err
	}

542
	return dht.datastore.Put(mkDsKey(key), data)
543
}
544

Aarsh Shah's avatar
Aarsh Shah committed
545
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
546
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
547 548
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
549 550
//    LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
Aarsh Shah's avatar
Aarsh Shah committed
551 552 553 554 555 556 557
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
558
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
559 560 561
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
562 563
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
564
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
565
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
566
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
567 568 569 570
		if err != nil {
			// peer not added.
			return
		}
571
		if !newlyAdded && queryPeer {
Aarsh Shah's avatar
Aarsh Shah committed
572 573 574
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
575 576
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
577 578
}

Aarsh Shah's avatar
Aarsh Shah committed
579
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
580
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
581
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
582 583
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
584
	dht.routingTable.RemovePeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
585 586
}

587 588 589 590
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
591
	}
592
}
Jeromy's avatar
Jeromy committed
593

Jeromy's avatar
Jeromy committed
594
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
595
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
596
	switch dht.host.Network().Connectedness(id) {
597
	case network.Connected, network.CanConnect:
598 599
		return dht.peerstore.PeerInfo(id)
	default:
600
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
601 602
	}
}
603

Jeromy's avatar
Jeromy committed
604
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
605
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
606
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
607
	return dht.sendRequest(ctx, p, pmes)
608
}
609

Adin Schmahmann's avatar
Adin Schmahmann committed
610 611
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
612
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
613 614
}

615
// nearestPeersToQuery returns the routing tables closest peers.
616
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
617
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
618 619 620
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
621
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
622
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
623
	closer := dht.nearestPeersToQuery(pmes, count)
624 625 626

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
627
		logger.Infow("no closer peers to send", from)
628 629 630
		return nil
	}

Steven Allen's avatar
Steven Allen committed
631
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
632 633 634
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
635
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
636
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
637 638
			return nil
		}
639
		// Dont send a peer back themselves
640
		if clp == from {
641 642 643
			continue
		}

Jeromy's avatar
Jeromy committed
644
		filtered = append(filtered, clp)
645 646
	}

647 648
	// ok seems like closer nodes
	return filtered
649 650
}

651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
669 670 671
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
672 673
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
674
	for _, p := range dht.serverProtocols {
675 676 677 678 679
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
680 681 682 683 684
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
685 686
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
687
	for _, p := range dht.serverProtocols {
688 689 690 691
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
692
	for _, p := range dht.serverProtocols {
693 694 695 696 697 698 699
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
700
					_ = s.Reset()
701 702 703 704 705 706 707 708 709 710 711 712 713
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
714
// Context returns the DHT's context.
715 716 717 718
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
719
// Process returns the DHT's process.
720 721 722 723
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
724
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
725 726 727 728
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
729
// Close calls Process Close.
730 731 732
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
733 734 735 736

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
737

Alan Shaw's avatar
Alan Shaw committed
738
// PeerID returns the DHT node's Peer ID.
739 740 741 742
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
743
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
744 745 746 747
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
748
// Host returns the libp2p host this DHT is operating with.
749 750 751 752
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
753
// Ping sends a ping message to the passed peer and waits for a response.
754 755 756 757
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
758
		return fmt.Errorf("sending request: %w", err)
759 760
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
761
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
762 763 764
	}
	return nil
}
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
781 782 783 784 785 786 787 788

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}