dht.go 23.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9
	"math/rand"
10 11
	"sync"
	"time"
12

13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
19
	"go.uber.org/zap"
20

21
	"go.opencensus.io/tag"
22

23
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
24
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
25
	"github.com/libp2p/go-libp2p-kad-dht/providers"
Aarsh Shah's avatar
Aarsh Shah committed
26 27 28
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
29

Aarsh Shah's avatar
Aarsh Shah committed
30
	"github.com/gogo/protobuf/proto"
31 32
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
33
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
34
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
35
	"github.com/multiformats/go-base32"
36
	ma "github.com/multiformats/go-multiaddr"
Adin Schmahmann's avatar
Adin Schmahmann committed
37
	"github.com/multiformats/go-multihash"
38 39
)

40 41 42 43
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
)
44

Alan Shaw's avatar
Alan Shaw committed
45 46
// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
Henrique Dias's avatar
Henrique Dias committed
47 48
const BaseConnMgrScore = 5

49 50 51
type mode int

const (
Steven Allen's avatar
Steven Allen committed
52 53
	modeServer mode = iota + 1
	modeClient
54 55
)

Adin Schmahmann's avatar
Adin Schmahmann committed
56 57 58 59 60
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

61
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
62
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
type IpfsDHT struct {
64 65 66
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
67

68
	datastore ds.Datastore // Local data
69

70
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
71 72
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
73

Aarsh Shah's avatar
Aarsh Shah committed
74
	birth time.Time // When this peer started up
75

76
	Validator record.Validator
77

78 79
	ctx  context.Context
	proc goprocess.Process
80 81 82

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
83

Steven Allen's avatar
Steven Allen committed
84
	plk sync.Mutex
85

86 87
	stripedPutLocks [256]sync.Mutex

88 89
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
90 91
	protocols     []protocol.ID
	protocolsStrs []string
Adin Schmahmann's avatar
Adin Schmahmann committed
92

93
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
94
	serverProtocols []protocol.ID
95

96
	auto   ModeOpt
97 98 99
	mode   mode
	modeLk sync.Mutex

100
	bucketSize int
101
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
102
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
103

104 105 106
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

107 108
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
109
	rtRefreshInterval     time.Duration
Steven Allen's avatar
Steven Allen committed
110
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
111
	triggerSelfLookup     chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
112

113 114 115 116 117
	// A set of bootstrap peers to fallback on if all other attempts to fix
	// the routing table fail (or, e.g., this is the first time this node is
	// connecting to the network).
	bootstrapPeers []peer.AddrInfo

Aarsh Shah's avatar
Aarsh Shah committed
118
	maxRecordAge time.Duration
119

120 121 122
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
123
	enableProviders, enableValues bool
124

Aarsh Shah's avatar
Aarsh Shah committed
125 126 127
	// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
	// to between two successful query responses from it, failing which,
	// we will ping it to see if it's alive.
128
	successfulOutboundQueryGracePeriod time.Duration
129 130

	fixLowPeersChan chan struct{}
131 132
}

Matt Joiner's avatar
Matt Joiner committed
133 134 135 136
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
137
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
138 139 140 141 142
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

143
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
144 145 146
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
147 148
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
149
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
150 151
		return nil, err
	}
152 153 154
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
155

Adin Schmahmann's avatar
Adin Schmahmann committed
156 157
	if err := cfg.validate(); err != nil {
		return nil, err
158
	}
Aarsh Shah's avatar
Aarsh Shah committed
159 160 161 162
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
163

164
	dht.autoRefresh = cfg.routingTable.autoRefresh
Aarsh Shah's avatar
Aarsh Shah committed
165
	dht.rtRefreshInterval = cfg.routingTable.refreshInterval
166
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
167

168 169 170
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
171

172
	dht.Validator = cfg.validator
173

174
	dht.auto = cfg.mode
175
	switch cfg.mode {
176
	case ModeAuto, ModeClient:
177
		dht.mode = modeClient
178
	case ModeAutoServer, ModeServer:
179 180
		dht.mode = modeServer
	default:
181
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
182 183 184 185 186
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
187
		}
188
	}
189 190 191 192 193 194 195 196 197 198

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
199
	dht.startSelfLookup()
200
	dht.startRefreshing()
Aarsh Shah's avatar
Aarsh Shah committed
201 202 203 204

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
205 206 207 208

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

209 210
	return dht, nil
}
211

212 213 214 215
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
216
	dht, err := New(ctx, h, Datastore(dstore))
217 218 219 220 221 222 223 224 225 226 227
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
228
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
229 230 231
	if err != nil {
		panic(err)
	}
232 233 234
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
235
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
236
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
237 238

	// check if custom test protocols were set
239
	if cfg.v1CompatibleMode {
240 241 242 243 244 245 246 247 248 249 250 251 252 253
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
254 255
	}

256
	dht := &IpfsDHT{
257 258 259 260 261 262 263
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
264
		protocolsStrs:          protocol.ConvertToStrings(protocols),
265 266 267 268 269 270 271 272
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		triggerRtRefresh:       make(chan chan<- error),
		triggerSelfLookup:      make(chan chan<- error),
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
273
		fixLowPeersChan:        make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
274
	}
275

Aarsh Shah's avatar
Aarsh Shah committed
276 277 278 279 280 281
	// construct routing table
	rt, err := makeRoutingTable(dht, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt
282
	dht.bootstrapPeers = cfg.bootstrapPeers
Aarsh Shah's avatar
Aarsh Shah committed
283

284 285
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
286 287 288 289 290 291

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
292 293 294 295 296
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
297

Aarsh Shah's avatar
Aarsh Shah committed
298
	return dht, nil
Jeromy's avatar
Jeromy committed
299 300
}

Aarsh Shah's avatar
Aarsh Shah committed
301
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
Aarsh Shah's avatar
Aarsh Shah committed
302 303 304 305 306 307
	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	l1 := math.Log(float64(1) / float64(defaultBucketSize))                              //(Log(1/K))
	l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
308
	maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
Aarsh Shah's avatar
Aarsh Shah committed
309

Aarsh Shah's avatar
Aarsh Shah committed
310
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
311

Aarsh Shah's avatar
Aarsh Shah committed
312
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
313
	dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
Aarsh Shah's avatar
Aarsh Shah committed
314
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
315 316 317 318 319 320 321

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
322 323 324

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
325 326 327 328
	}

	return rt, err
}
329

Will Scott's avatar
Will Scott committed
330 331 332 333 334
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

335
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
336
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
337 338 339
	timer := time.NewTimer(periodicBootstrapInterval)
	defer timer.Stop()

340 341 342
	for {
		select {
		case <-dht.fixLowPeersChan:
343
		case <-timer.C:
344 345 346
		case <-proc.Closing():
			return
		}
347

348 349 350 351
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

352 353
		// we try to add all peers we are connected to to the Routing Table
		// in case they aren't already there.
354 355 356 357
		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
		// TODO Active Bootstrapping
		// We should first use non-bootstrap peers we knew of from previous
		// snapshots of the Routing Table before we connect to the bootstrappers.
		// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
		if dht.routingTable.Size() == 0 {
			if len(dht.bootstrapPeers) == 0 {
				// No point in continuing, we have no peers!
				continue
			}

			found := 0
			for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
				ai := dht.bootstrapPeers[i]
				err := dht.Host().Connect(dht.Context(), ai)
				if err == nil {
					found++
				} else {
					logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
				}

				// Wait for two bootstrap peers, or try them all.
				//
				// Why two? In theory, one should be enough
				// normally. However, if the network were to
				// restart and everyone connected to just one
				// bootstrapper, we'll end up with a mostly
				// partitioned network.
				//
				// So we always bootstrap with two random peers.
				if found == maxNBoostrappers {
					break
				}
			}
		}

		// if we still don't have peers in our routing table(probably because Identify hasn't completed),
		// there is no point in triggering a Refresh.
		if dht.routingTable.Size() == 0 {
			continue
		}

399 400 401 402 403 404 405 406 407 408
		if dht.autoRefresh {
			select {
			case dht.triggerRtRefresh <- nil:
			default:
			}
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
428
// putValueToPeer stores the given key/value pair at the peer 'p'
429 430
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
431
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
432
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
433
	if err != nil {
Steven Allen's avatar
Steven Allen committed
434
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
435 436
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437

438
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
439
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440 441
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
442

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
443
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
444 445
}

446 447
var errInvalidRecord = errors.New("received invalid record")

448 449
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
450
// NOTE: It will update the dht's peerstore with any new addresses
451
// it finds for the given peer.
452
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
453
	pmes, err := dht.getValueSingle(ctx, p, key)
454
	if err != nil {
455
		return nil, nil, err
456 457
	}

458 459 460
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

461
	if record := pmes.GetRecord(); record != nil {
462
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
463
		logger.Debug("got value")
464

465
		// make sure record is valid.
466
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
467
		if err != nil {
Steven Allen's avatar
Steven Allen committed
468
			logger.Debug("received invalid record (discarded)")
469 470
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
471
			record = new(recpb.Record)
472
		}
473
		return record, peers, err
474
	}
475

476 477 478 479
	if len(peers) > 0 {
		return nil, peers, nil
	}

480
	return nil, nil, routing.ErrNotFound
481 482
}

483
// getValueSingle simply performs the get value RPC with the given parameters
484
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
485
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
486
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
487 488
}

489
// getLocal attempts to retrieve the value from the datastore
490
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
491 492
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

493
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
494
	if err != nil {
Steven Allen's avatar
Steven Allen committed
495
		logger.Warnw("get local failed", "key", key, "error", err)
496 497
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
498

499
	// Double check the key. Can't hurt.
500
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
501
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
502
		return nil, nil
503 504

	}
505
	return rec, nil
506 507
}

508
// putLocal stores the key value pair in the datastore
509
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
510 511
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
512
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
513 514 515
		return err
	}

516
	return dht.datastore.Put(mkDsKey(key), data)
517
}
518

Aarsh Shah's avatar
Aarsh Shah committed
519
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
520
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
521 522 523 524 525 526 527 528 529 530 531
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
//    LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
532
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
533 534 535
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
536 537
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
538
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
539
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
540
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
541 542 543 544
		if err != nil {
			// peer not added.
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
545

Aarsh Shah's avatar
Aarsh Shah committed
546
		// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
Aarsh Shah's avatar
Aarsh Shah committed
547
		// value that must have been set in the Routing Table for this peer when it was first added during a connection.
Aarsh Shah's avatar
Aarsh Shah committed
548 549 550 551 552 553
		if newlyAdded && queryPeer {
			dht.routingTable.UpdateLastUsefulAt(p, time.Now())
		} else if queryPeer {
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
554 555
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
556 557
}

Aarsh Shah's avatar
Aarsh Shah committed
558
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
559
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
560
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
561 562
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
563
	dht.routingTable.RemovePeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
564 565
}

566 567 568 569
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
570
	}
571
}
Jeromy's avatar
Jeromy committed
572

Jeromy's avatar
Jeromy committed
573
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
574
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
575
	switch dht.host.Network().Connectedness(id) {
576
	case network.Connected, network.CanConnect:
577 578
		return dht.peerstore.PeerInfo(id)
	default:
579
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
580 581
	}
}
582

Jeromy's avatar
Jeromy committed
583
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
584
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
585
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
586
	return dht.sendRequest(ctx, p, pmes)
587
}
588

Adin Schmahmann's avatar
Adin Schmahmann committed
589 590
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
591
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
592 593
}

594
// nearestPeersToQuery returns the routing tables closest peers.
595
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
596
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
597 598 599
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
600
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
601
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
602
	closer := dht.nearestPeersToQuery(pmes, count)
603 604 605

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
606
		logger.Infow("no closer peers to send", from)
607 608 609
		return nil
	}

Steven Allen's avatar
Steven Allen committed
610
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
611 612 613
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
614
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
615
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
616 617
			return nil
		}
618
		// Dont send a peer back themselves
619
		if clp == from {
620 621 622
			continue
		}

Jeromy's avatar
Jeromy committed
623
		filtered = append(filtered, clp)
624 625
	}

626 627
	// ok seems like closer nodes
	return filtered
628 629
}

630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
648 649 650
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
651 652
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
653
	for _, p := range dht.serverProtocols {
654 655 656 657 658
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
659 660 661 662 663
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
664 665
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
666
	for _, p := range dht.serverProtocols {
667 668 669 670
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
671
	for _, p := range dht.serverProtocols {
672 673 674 675 676 677 678
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
679
					_ = s.Reset()
680 681 682 683 684 685 686 687 688 689 690 691 692
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
693
// Context returns the DHT's context.
694 695 696 697
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
698
// Process returns the DHT's process.
699 700 701 702
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
703
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
704 705 706 707
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
708
// Close calls Process Close.
709 710 711
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
712 713 714 715

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
716

Alan Shaw's avatar
Alan Shaw committed
717
// PeerID returns the DHT node's Peer ID.
718 719 720 721
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
722
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
723 724 725 726
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
727
// Host returns the libp2p host this DHT is operating with.
728 729 730 731
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
732
// Ping sends a ping message to the passed peer and waits for a response.
733 734 735 736
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
737
		return fmt.Errorf("sending request: %w", err)
738 739
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
740
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
741 742 743
	}
	return nil
}
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
760 761 762 763 764 765 766 767

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}