dht.go 26.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9
	"math/rand"
10 11
	"sync"
	"time"
12

13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
19

20
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
21
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
22
	"github.com/libp2p/go-libp2p-kad-dht/providers"
23
	"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Aarsh Shah's avatar
Aarsh Shah committed
24
	kb "github.com/libp2p/go-libp2p-kbucket"
25
	"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
Aarsh Shah's avatar
Aarsh Shah committed
26 27
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
28

Aarsh Shah's avatar
Aarsh Shah committed
29
	"github.com/gogo/protobuf/proto"
30 31
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
32
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
33
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
34
	"github.com/multiformats/go-base32"
35
	ma "github.com/multiformats/go-multiaddr"
Adin Schmahmann's avatar
Adin Schmahmann committed
36
	"github.com/multiformats/go-multihash"
37 38
	"go.opencensus.io/tag"
	"go.uber.org/zap"
39 40
)

41 42 43
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
Aarsh Shah's avatar
Aarsh Shah committed
44 45

	rtFreezeTimeout = 1 * time.Minute
46
)
47

Steven Allen's avatar
Steven Allen committed
48 49 50 51 52 53
const (
	// BaseConnMgrScore is the base of the score set on the connection
	// manager "kbucket" tag. It is added with the common prefix length
	// between two peer IDs.
	baseConnMgrScore = 5
)
Henrique Dias's avatar
Henrique Dias committed
54

55 56 57
type mode int

const (
Steven Allen's avatar
Steven Allen committed
58 59
	modeServer mode = iota + 1
	modeClient
60 61
)

Adin Schmahmann's avatar
Adin Schmahmann committed
62 63 64 65 66
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

67
const (
68 69
	kbucketTag       = "kbucket"
	protectedBuckets = 2
70 71
)

Aarsh Shah's avatar
Aarsh Shah committed
72 73 74 75 76
type addPeerRTReq struct {
	p         peer.ID
	queryPeer bool
}

77
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
78
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
type IpfsDHT struct {
80 81 82
	host      host.Host // the network services we need
	self      peer.ID   // Local peer (yourself)
	selfKey   kb.ID
83
	peerstore peerstore.Peerstore // Peer Registry
84

85
	datastore ds.Datastore // Local data
86

87
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
88 89
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
90

91 92 93
	// manages Routing Table refresh
	rtRefreshManager *rtrefresh.RtRefreshManager

Aarsh Shah's avatar
Aarsh Shah committed
94
	birth time.Time // When this peer started up
95

96
	Validator record.Validator
97

98 99
	ctx  context.Context
	proc goprocess.Process
100 101 102

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
103

Steven Allen's avatar
Steven Allen committed
104
	plk sync.Mutex
105

106 107
	stripedPutLocks [256]sync.Mutex

108 109
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
110 111
	protocols     []protocol.ID
	protocolsStrs []string
Adin Schmahmann's avatar
Adin Schmahmann committed
112

113
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
114
	serverProtocols []protocol.ID
115

116
	auto   ModeOpt
117 118 119
	mode   mode
	modeLk sync.Mutex

120
	bucketSize int
121
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
122
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
123

124 125
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc
126
	rtPeerDiversityFilter  peerdiversity.PeerIPGroupFilter
127

128
	autoRefresh bool
Aarsh Shah's avatar
Aarsh Shah committed
129

130 131 132 133 134
	// A set of bootstrap peers to fallback on if all other attempts to fix
	// the routing table fail (or, e.g., this is the first time this node is
	// connecting to the network).
	bootstrapPeers []peer.AddrInfo

Aarsh Shah's avatar
Aarsh Shah committed
135
	maxRecordAge time.Duration
136

137 138 139
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
140
	enableProviders, enableValues bool
141 142

	fixLowPeersChan chan struct{}
Aarsh Shah's avatar
Aarsh Shah committed
143 144 145 146 147

	addPeerToRTChan   chan addPeerRTReq
	refreshFinishedCh chan struct{}

	rtFreezeTimeout time.Duration
148 149
}

Matt Joiner's avatar
Matt Joiner committed
150 151 152 153
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
154
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
155 156 157 158 159
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

160
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
161 162 163
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
164 165
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
166
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
167 168
		return nil, err
	}
169 170 171
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
172

Adin Schmahmann's avatar
Adin Schmahmann committed
173 174
	if err := cfg.validate(); err != nil {
		return nil, err
175
	}
176

Aarsh Shah's avatar
Aarsh Shah committed
177 178 179 180
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
181

182
	dht.autoRefresh = cfg.routingTable.autoRefresh
183

184 185 186
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
187

188
	dht.Validator = cfg.validator
189

190
	dht.auto = cfg.mode
191
	switch cfg.mode {
192
	case ModeAuto, ModeClient:
193
		dht.mode = modeClient
194
	case ModeAutoServer, ModeServer:
195 196
		dht.mode = modeServer
	default:
197
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
198 199 200 201 202
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
203
		}
204
	}
205 206 207 208 209 210 211 212 213 214

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

215 216 217
	if err := dht.rtRefreshManager.Start(); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
218 219 220 221

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
222 223

	// listens to the fix low peers chan and tries to fix the Routing Table
Aarsh Shah's avatar
Aarsh Shah committed
224 225 226 227 228
	if !cfg.disableFixLowPeers {
		dht.proc.Go(dht.fixLowPeersRoutine)
	}

	dht.proc.Go(dht.rtPeerLoop)
229

230 231
	return dht, nil
}
232

233 234 235 236
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
237
	dht, err := New(ctx, h, Datastore(dstore))
238 239 240 241 242 243 244 245 246 247
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
248
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
249 250 251
	if err != nil {
		panic(err)
	}
252 253 254
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
255
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
256
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
257 258

	// check if custom test protocols were set
259
	if cfg.v1CompatibleMode {
260 261 262 263 264 265 266 267 268 269 270 271 272 273
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
274 275
	}

276
	dht := &IpfsDHT{
277 278
		datastore:              cfg.datastore,
		self:                   h.ID(),
279
		selfKey:                kb.ConvertPeerID(h.ID()),
280 281 282 283 284
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
285
		protocolsStrs:          protocol.ConvertToStrings(protocols),
286 287 288 289 290 291
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
292 293 294
		rtPeerDiversityFilter:  cfg.routingTable.diversityFilter,

		fixLowPeersChan: make(chan struct{}, 1),
Aarsh Shah's avatar
Aarsh Shah committed
295 296 297

		addPeerToRTChan:   make(chan addPeerRTReq),
		refreshFinishedCh: make(chan struct{}),
Jeromy's avatar
Jeromy committed
298
	}
299

300 301 302 303 304 305 306 307 308 309 310 311 312 313
	var maxLastSuccessfulOutboundThreshold time.Duration

	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	if cfg.concurrency < cfg.bucketSize { // (alpha < K)
		l1 := math.Log(float64(1) / float64(cfg.bucketSize))                              //(Log(1/K))
		l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
		maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
	} else {
		maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
	}

Aarsh Shah's avatar
Aarsh Shah committed
314
	// construct routing table
315 316
	// use twice the theoritical usefulness threhold to keep older peers around longer
	rt, err := makeRoutingTable(dht, cfg, 2*maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
317 318 319 320
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt
321
	dht.bootstrapPeers = cfg.bootstrapPeers
Aarsh Shah's avatar
Aarsh Shah committed
322

323 324 325 326 327 328 329
	// rt refresh manager
	rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
	if err != nil {
		return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
	}
	dht.rtRefreshManager = rtRefresh

330
	// create a DHT proc with the given context
331 332 333
	dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
		return rtRefresh.Close()
	})
Aarsh Shah's avatar
Aarsh Shah committed
334 335 336 337 338 339

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
340 341 342 343 344
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
345

Aarsh Shah's avatar
Aarsh Shah committed
346 347
	dht.rtFreezeTimeout = rtFreezeTimeout

Aarsh Shah's avatar
Aarsh Shah committed
348
	return dht, nil
Jeromy's avatar
Jeromy committed
349 350
}

351 352 353 354 355
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
	keyGenFnc := func(cpl uint) (string, error) {
		p, err := dht.routingTable.GenRandPeerID(cpl)
		return string(p), err
	}
Aarsh Shah's avatar
Aarsh Shah committed
356

357 358 359 360 361 362 363 364 365 366 367
	queryFnc := func(ctx context.Context, key string) error {
		_, err := dht.GetClosestPeers(ctx, key)
		return err
	}

	r, err := rtrefresh.NewRtRefreshManager(
		dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
		keyGenFnc,
		queryFnc,
		cfg.routingTable.refreshQueryTimeout,
		cfg.routingTable.refreshInterval,
Aarsh Shah's avatar
Aarsh Shah committed
368 369
		maxLastSuccessfulOutboundThreshold,
		dht.refreshFinishedCh)
370 371 372 373 374

	return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
	// make a Routing Table Diversity Filter
	var filter *peerdiversity.Filter
	if dht.rtPeerDiversityFilter != nil {
		df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int {
			return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
		})

		if err != nil {
			return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err)
		}

		filter = df
	}

	rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
Aarsh Shah's avatar
Aarsh Shah committed
390
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
391 392

	rt.PeerAdded = func(p peer.ID) {
393
		commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
394 395 396 397 398
		if commonPrefixLen < protectedBuckets {
			cmgr.Protect(p, kbucketTag)
		} else {
			cmgr.TagPeer(p, kbucketTag, baseConnMgrScore)
		}
Aarsh Shah's avatar
Aarsh Shah committed
399 400
	}
	rt.PeerRemoved = func(p peer.ID) {
401
		cmgr.Unprotect(p, kbucketTag)
402
		cmgr.UntagPeer(p, kbucketTag)
403 404 405

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
406 407 408 409
	}

	return rt, err
}
410

411 412 413 414 415
// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (d *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
	return d.routingTable.GetDiversityStats()
}

Will Scott's avatar
Will Scott committed
416 417 418 419 420
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

421
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
422
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
423 424
	ticker := time.NewTicker(periodicBootstrapInterval)
	defer ticker.Stop()
425

426 427 428
	for {
		select {
		case <-dht.fixLowPeersChan:
429
		case <-ticker.C:
430 431 432
		case <-proc.Closing():
			return
		}
433

434 435 436 437
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

438 439
		// we try to add all peers we are connected to to the Routing Table
		// in case they aren't already there.
440 441 442 443
		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
		// TODO Active Bootstrapping
		// We should first use non-bootstrap peers we knew of from previous
		// snapshots of the Routing Table before we connect to the bootstrappers.
		// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
		if dht.routingTable.Size() == 0 {
			if len(dht.bootstrapPeers) == 0 {
				// No point in continuing, we have no peers!
				continue
			}

			found := 0
			for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
				ai := dht.bootstrapPeers[i]
				err := dht.Host().Connect(dht.Context(), ai)
				if err == nil {
					found++
				} else {
					logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
				}

				// Wait for two bootstrap peers, or try them all.
				//
				// Why two? In theory, one should be enough
				// normally. However, if the network were to
				// restart and everyone connected to just one
				// bootstrapper, we'll end up with a mostly
				// partitioned network.
				//
				// So we always bootstrap with two random peers.
				if found == maxNBoostrappers {
					break
				}
			}
		}

		// if we still don't have peers in our routing table(probably because Identify hasn't completed),
		// there is no point in triggering a Refresh.
		if dht.routingTable.Size() == 0 {
			continue
		}

485
		if dht.autoRefresh {
486
			dht.rtRefreshManager.RefreshNoWait()
487 488 489 490 491
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
511
// putValueToPeer stores the given key/value pair at the peer 'p'
512 513
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
514
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
515
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
516
	if err != nil {
Steven Allen's avatar
Steven Allen committed
517
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
518 519
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
520

521
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
522
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
523 524
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
525

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
526
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527 528
}

529 530
var errInvalidRecord = errors.New("received invalid record")

531 532
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
533
// NOTE: It will update the dht's peerstore with any new addresses
534
// it finds for the given peer.
535
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
536
	pmes, err := dht.getValueSingle(ctx, p, key)
537
	if err != nil {
538
		return nil, nil, err
539 540
	}

541 542 543
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

544
	if record := pmes.GetRecord(); record != nil {
545
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
546
		logger.Debug("got value")
547

548
		// make sure record is valid.
549
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
550
		if err != nil {
Steven Allen's avatar
Steven Allen committed
551
			logger.Debug("received invalid record (discarded)")
552 553
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
554
			record = new(recpb.Record)
555
		}
556
		return record, peers, err
557
	}
558

559 560 561 562
	if len(peers) > 0 {
		return nil, peers, nil
	}

563
	return nil, nil, routing.ErrNotFound
564 565
}

566
// getValueSingle simply performs the get value RPC with the given parameters
567
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
568
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
569
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
570 571
}

572
// getLocal attempts to retrieve the value from the datastore
573
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
574 575
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

576
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
577
	if err != nil {
Steven Allen's avatar
Steven Allen committed
578
		logger.Warnw("get local failed", "key", key, "error", err)
579 580
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
581

582
	// Double check the key. Can't hurt.
583
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
584
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
585
		return nil, nil
586 587

	}
588
	return rec, nil
589 590
}

591
// putLocal stores the key value pair in the datastore
592
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
593 594
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
595
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
596 597 598
		return err
	}

599
	return dht.datastore.Put(mkDsKey(key), data)
600
}
601

Aarsh Shah's avatar
Aarsh Shah committed
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
	bootstrapCount := 0
	isBootsrapping := false
	var timerCh <-chan time.Time

	for {
		select {
		case <-timerCh:
			dht.routingTable.MarkAllPeersIrreplaceable()
		case addReq := <-dht.addPeerToRTChan:
			prevSize := dht.routingTable.Size()
			if prevSize == 0 {
				isBootsrapping = true
				bootstrapCount = 0
				timerCh = nil
			}
			newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
			if err != nil {
				// peer not added.
				continue
			}
			if !newlyAdded && addReq.queryPeer {
				// the peer is already in our RT, but we just successfully queried it and so let's give it a
				// bump on the query time so we don't ping it too soon for a liveliness check.
				dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
			}
		case <-dht.refreshFinishedCh:
			bootstrapCount = bootstrapCount + 1
			if bootstrapCount == 2 {
				timerCh = time.NewTimer(dht.rtFreezeTimeout).C
			}

			old := isBootsrapping
			isBootsrapping = false
			if old {
				dht.rtRefreshManager.RefreshNoWait()
			}

		case <-proc.Closing():
			return
		}
	}
}

Aarsh Shah's avatar
Aarsh Shah committed
646
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
647
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
648 649
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
650 651
//    LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
Aarsh Shah's avatar
Aarsh Shah committed
652 653 654 655 656 657 658
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
659
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
660 661 662
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
663 664
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
665
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
666
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
667 668 669
		select {
		case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
		case <-dht.ctx.Done():
Steven Allen's avatar
Steven Allen committed
670 671
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
672
	}
Aarsh Shah's avatar
Aarsh Shah committed
673 674
}

Aarsh Shah's avatar
Aarsh Shah committed
675
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
676
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
677
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
678 679
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
680
	dht.routingTable.RemovePeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
681 682
}

683 684 685 686
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
687
	}
688
}
Jeromy's avatar
Jeromy committed
689

Jeromy's avatar
Jeromy committed
690
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
691
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
692
	switch dht.host.Network().Connectedness(id) {
693
	case network.Connected, network.CanConnect:
694 695
		return dht.peerstore.PeerInfo(id)
	default:
696
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
697 698
	}
}
699

Jeromy's avatar
Jeromy committed
700
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
701
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
702
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
703
	return dht.sendRequest(ctx, p, pmes)
704
}
705

Adin Schmahmann's avatar
Adin Schmahmann committed
706 707
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
708
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
709 710
}

711
// nearestPeersToQuery returns the routing tables closest peers.
712
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
713
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
714 715 716
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
717
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
718
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
719
	closer := dht.nearestPeersToQuery(pmes, count)
720 721 722

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
723
		logger.Infow("no closer peers to send", from)
724 725 726
		return nil
	}

Steven Allen's avatar
Steven Allen committed
727
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
728 729 730
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
731
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
732
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
733 734
			return nil
		}
735
		// Dont send a peer back themselves
736
		if clp == from {
737 738 739
			continue
		}

Jeromy's avatar
Jeromy committed
740
		filtered = append(filtered, clp)
741 742
	}

743 744
	// ok seems like closer nodes
	return filtered
745 746
}

747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
765 766 767
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
768 769
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
770
	for _, p := range dht.serverProtocols {
771 772 773 774 775
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
776 777 778 779 780
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
781 782
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
783
	for _, p := range dht.serverProtocols {
784 785 786 787
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
788
	for _, p := range dht.serverProtocols {
789 790 791 792 793 794 795
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
796
					_ = s.Reset()
797 798 799 800 801 802 803 804 805 806 807 808 809
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
810
// Context returns the DHT's context.
811 812 813 814
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
815
// Process returns the DHT's process.
816 817 818 819
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
820
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
821 822 823 824
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
825
// Close calls Process Close.
826 827 828
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
829 830 831 832

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
833

Alan Shaw's avatar
Alan Shaw committed
834
// PeerID returns the DHT node's Peer ID.
835 836 837 838
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
839
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
840 841 842 843
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
844
// Host returns the libp2p host this DHT is operating with.
845 846 847 848
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
849
// Ping sends a ping message to the passed peer and waits for a response.
850 851 852 853
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
854
		return fmt.Errorf("sending request: %w", err)
855 856
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
857
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
858 859 860
	}
	return nil
}
861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
877 878 879 880 881 882 883 884

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}