dht.go 22 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9 10
	"sync"
	"time"
11

12 13 14 15 16 17
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"
18
	"go.uber.org/zap"
19

20
	"go.opencensus.io/tag"
21

22
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
23
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
24
	"github.com/libp2p/go-libp2p-kad-dht/providers"
Aarsh Shah's avatar
Aarsh Shah committed
25 26 27
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
28

Aarsh Shah's avatar
Aarsh Shah committed
29
	"github.com/gogo/protobuf/proto"
30 31
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
32
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
33
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
34
	"github.com/multiformats/go-base32"
35
	ma "github.com/multiformats/go-multiaddr"
Adin Schmahmann's avatar
Adin Schmahmann committed
36
	"github.com/multiformats/go-multihash"
37 38
)

39 40 41 42
var (
	logger     = logging.Logger("dht")
	baseLogger = logger.Desugar()
)
43

Alan Shaw's avatar
Alan Shaw committed
44 45
// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
Henrique Dias's avatar
Henrique Dias committed
46 47
const BaseConnMgrScore = 5

48 49 50
type mode int

const (
Steven Allen's avatar
Steven Allen committed
51 52
	modeServer mode = iota + 1
	modeClient
53 54
)

Adin Schmahmann's avatar
Adin Schmahmann committed
55 56 57 58 59
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

60
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
61
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
type IpfsDHT struct {
63 64 65
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
66

67
	datastore ds.Datastore // Local data
68

69
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
70 71
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
72

Aarsh Shah's avatar
Aarsh Shah committed
73
	birth time.Time // When this peer started up
74

75
	Validator record.Validator
76

77 78
	ctx  context.Context
	proc goprocess.Process
79 80 81

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
82

Steven Allen's avatar
Steven Allen committed
83
	plk sync.Mutex
84

85 86
	stripedPutLocks [256]sync.Mutex

87 88
	// DHT protocols we query with. We'll only add peers to our routing
	// table if they speak these protocols.
Adin Schmahmann's avatar
Adin Schmahmann committed
89 90
	protocols []protocol.ID

91
	// DHT protocols we can respond to.
Adin Schmahmann's avatar
Adin Schmahmann committed
92
	serverProtocols []protocol.ID
93

94
	auto   ModeOpt
95 96 97
	mode   mode
	modeLk sync.Mutex

98
	bucketSize int
99
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
100
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
101

102 103 104
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

105 106
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
107
	rtRefreshInterval     time.Duration
Steven Allen's avatar
Steven Allen committed
108
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
109
	triggerSelfLookup     chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
110 111

	maxRecordAge time.Duration
112

113 114 115
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
116
	enableProviders, enableValues bool
117

Aarsh Shah's avatar
Aarsh Shah committed
118 119 120
	// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
	// to between two successful query responses from it, failing which,
	// we will ping it to see if it's alive.
121
	successfulOutboundQueryGracePeriod time.Duration
122 123

	fixLowPeersChan chan struct{}
124 125
}

Matt Joiner's avatar
Matt Joiner committed
126 127 128 129
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
130
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
131 132 133 134 135
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

136
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
137 138 139
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
140 141
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
142
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
143 144
		return nil, err
	}
145 146 147
	if err := cfg.applyFallbacks(h); err != nil {
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
148

Adin Schmahmann's avatar
Adin Schmahmann committed
149 150
	if err := cfg.validate(); err != nil {
		return nil, err
151
	}
Aarsh Shah's avatar
Aarsh Shah committed
152 153 154 155
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
156

157
	dht.autoRefresh = cfg.routingTable.autoRefresh
Aarsh Shah's avatar
Aarsh Shah committed
158
	dht.rtRefreshInterval = cfg.routingTable.refreshInterval
159
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
160

161 162 163
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
164

165
	dht.Validator = cfg.validator
166

167
	dht.auto = cfg.mode
168
	switch cfg.mode {
169
	case ModeAuto, ModeClient:
170
		dht.mode = modeClient
171
	case ModeAutoServer, ModeServer:
172 173
		dht.mode = modeServer
	default:
174
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
175 176 177 178 179
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
180
		}
181
	}
182 183 184 185 186 187 188 189 190 191

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
192
	dht.startSelfLookup()
193
	dht.startRefreshing()
Aarsh Shah's avatar
Aarsh Shah committed
194 195 196 197

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
198 199 200 201

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

202 203
	return dht, nil
}
204

205 206 207 208
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
209
	dht, err := New(ctx, h, Datastore(dstore))
210 211 212 213 214 215 216 217 218 219 220
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
221
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
222 223 224
	if err != nil {
		panic(err)
	}
225 226 227
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
228
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
229
	var protocols, serverProtocols []protocol.ID
Adin Schmahmann's avatar
Adin Schmahmann committed
230 231

	// check if custom test protocols were set
232
	if cfg.v1CompatibleMode {
233 234 235 236 237 238 239 240 241 242 243 244 245 246
		// In compat mode, query/serve using the old protocol.
		//
		// DO NOT accept requests on the new protocol. Otherwise:
		// 1. We'll end up in V2 routing tables.
		// 2. We'll have V1 peers in our routing table.
		//
		// In other words, we'll pollute the V2 network.
		protocols = []protocol.ID{cfg.protocolPrefix + kad1}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad1}
	} else {
		// In v2 mode, serve on both protocols, but only
		// query/accept peers in v2 mode.
		protocols = []protocol.ID{cfg.protocolPrefix + kad2}
		serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
Adin Schmahmann's avatar
Adin Schmahmann committed
247 248
	}

249
	dht := &IpfsDHT{
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		triggerRtRefresh:       make(chan chan<- error),
		triggerSelfLookup:      make(chan chan<- error),
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
265
		fixLowPeersChan:        make(chan struct{}),
Jeromy's avatar
Jeromy committed
266
	}
267

Aarsh Shah's avatar
Aarsh Shah committed
268 269 270 271 272 273 274
	// construct routing table
	rt, err := makeRoutingTable(dht, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt

275 276
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
277 278 279 280 281 282

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

Alan Shaw's avatar
Alan Shaw committed
283 284 285 286 287
	pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
	if err != nil {
		return nil, err
	}
	dht.ProviderManager = pm
288

Aarsh Shah's avatar
Aarsh Shah committed
289
	return dht, nil
Jeromy's avatar
Jeromy committed
290 291
}

Aarsh Shah's avatar
Aarsh Shah committed
292
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
Aarsh Shah's avatar
Aarsh Shah committed
293 294 295 296 297 298
	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	l1 := math.Log(float64(1) / float64(defaultBucketSize))                              //(Log(1/K))
	l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
299
	maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
Aarsh Shah's avatar
Aarsh Shah committed
300

Aarsh Shah's avatar
Aarsh Shah committed
301
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
302

Aarsh Shah's avatar
Aarsh Shah committed
303
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
304
	dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
Aarsh Shah's avatar
Aarsh Shah committed
305
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
306 307 308 309 310 311 312

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
313 314 315

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
316 317 318 319
	}

	return rt, err
}
320

Will Scott's avatar
Will Scott committed
321 322 323 324 325
// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
	return dht.auto
}

326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
	for {
		select {
		case <-dht.fixLowPeersChan:
		case <-proc.Closing():
			return
		}
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

		if dht.autoRefresh {
			select {
			case dht.triggerRtRefresh <- nil:
			default:
			}
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
371
// putValueToPeer stores the given key/value pair at the peer 'p'
372 373
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
374
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
375
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
376
	if err != nil {
Steven Allen's avatar
Steven Allen committed
377
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
378 379
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380

381
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
382
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
383 384
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
385

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
386
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
387 388
}

389 390
var errInvalidRecord = errors.New("received invalid record")

391 392
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
393
// NOTE: It will update the dht's peerstore with any new addresses
394
// it finds for the given peer.
395
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
396
	pmes, err := dht.getValueSingle(ctx, p, key)
397
	if err != nil {
398
		return nil, nil, err
399 400
	}

401 402 403
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

404
	if record := pmes.GetRecord(); record != nil {
405
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
406
		logger.Debug("got value")
407

408
		// make sure record is valid.
409
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
410
		if err != nil {
Steven Allen's avatar
Steven Allen committed
411
			logger.Debug("received invalid record (discarded)")
412 413
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
414
			record = new(recpb.Record)
415
		}
416
		return record, peers, err
417
	}
418

419 420 421 422
	if len(peers) > 0 {
		return nil, peers, nil
	}

423
	return nil, nil, routing.ErrNotFound
424 425
}

426
// getValueSingle simply performs the get value RPC with the given parameters
427
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
428
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
429
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
430 431
}

432
// getLocal attempts to retrieve the value from the datastore
433
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
434 435
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

436
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
437
	if err != nil {
Steven Allen's avatar
Steven Allen committed
438
		logger.Warnw("get local failed", "key", key, "error", err)
439 440
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
441

442
	// Double check the key. Can't hurt.
443
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
444
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
445
		return nil, nil
446 447

	}
448
	return rec, nil
449 450
}

451
// putLocal stores the key value pair in the datastore
452
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
453 454
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
455
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
456 457 458
		return err
	}

459
	return dht.datastore.Put(mkDsKey(key), data)
460
}
461

Aarsh Shah's avatar
Aarsh Shah committed
462
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
463
// might support the DHT protocol.
Aarsh Shah's avatar
Aarsh Shah committed
464 465 466 467 468 469 470 471 472 473 474
// If we have a connection a peer but no exchange of a query RPC ->
//    LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
//    LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
//    LastQueriedAt=time.Now (same reason as above)
//    LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
//    LastQueriedAt=time.Now()
//    LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//    Do Nothing.
Aarsh Shah's avatar
Aarsh Shah committed
475
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
476 477 478
	if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
		c.Write(zap.String("peer", p.String()))
	}
Aarsh Shah's avatar
Aarsh Shah committed
479 480
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
481
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
482
	} else if b {
Aarsh Shah's avatar
Aarsh Shah committed
483
		newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
Steven Allen's avatar
Steven Allen committed
484 485 486 487
		if err != nil {
			// peer not added.
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
488

Aarsh Shah's avatar
Aarsh Shah committed
489
		// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
Aarsh Shah's avatar
Aarsh Shah committed
490
		// value that must have been set in the Routing Table for this peer when it was first added during a connection.
Aarsh Shah's avatar
Aarsh Shah committed
491 492 493 494 495 496
		if newlyAdded && queryPeer {
			dht.routingTable.UpdateLastUsefulAt(p, time.Now())
		} else if queryPeer {
			// the peer is already in our RT, but we just successfully queried it and so let's give it a
			// bump on the query time so we don't ping it too soon for a liveliness check.
			dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Aarsh Shah's avatar
Aarsh Shah committed
497 498
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
499 500
}

Aarsh Shah's avatar
Aarsh Shah committed
501
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
502
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
503
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
504 505
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
506 507 508
	dht.routingTable.RemovePeer(p)

	// since we lost a peer from the RT, we should do this here
509
	dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
510 511
}

512 513 514 515
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
516
	}
517
}
Jeromy's avatar
Jeromy committed
518

Jeromy's avatar
Jeromy committed
519
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
520
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
521
	switch dht.host.Network().Connectedness(id) {
522
	case network.Connected, network.CanConnect:
523 524
		return dht.peerstore.PeerInfo(id)
	default:
525
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
526 527
	}
}
528

Jeromy's avatar
Jeromy committed
529
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
530
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
531
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
532
	return dht.sendRequest(ctx, p, pmes)
533
}
534

Adin Schmahmann's avatar
Adin Schmahmann committed
535 536
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
537
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
538 539
}

540
// nearestPeersToQuery returns the routing tables closest peers.
541
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
542
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
543 544 545
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
546
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
547
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
548
	closer := dht.nearestPeersToQuery(pmes, count)
549 550 551

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
552
		logger.Infow("no closer peers to send", from)
553 554 555
		return nil
	}

Steven Allen's avatar
Steven Allen committed
556
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
557 558 559
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
560
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
561
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
562 563
			return nil
		}
564
		// Dont send a peer back themselves
565
		if clp == from {
566 567 568
			continue
		}

Jeromy's avatar
Jeromy committed
569
		filtered = append(filtered, clp)
570 571
	}

572 573
	// ok seems like closer nodes
	return filtered
574 575
}

576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
594 595 596
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
597 598
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
599
	for _, p := range dht.serverProtocols {
600 601 602 603 604
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
605 606 607 608 609
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
610 611
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
612
	for _, p := range dht.serverProtocols {
613 614 615 616
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
617
	for _, p := range dht.serverProtocols {
618 619 620 621 622 623 624
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
625
					_ = s.Reset()
626 627 628 629 630 631 632 633 634 635 636 637 638
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

Alan Shaw's avatar
Alan Shaw committed
639
// Context returns the DHT's context.
640 641 642 643
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

Alan Shaw's avatar
Alan Shaw committed
644
// Process returns the DHT's process.
645 646 647 648
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

Alan Shaw's avatar
Alan Shaw committed
649
// RoutingTable returns the DHT's routingTable.
ZhengQi's avatar
ZhengQi committed
650 651 652 653
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

Alan Shaw's avatar
Alan Shaw committed
654
// Close calls Process Close.
655 656 657
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
658 659 660 661

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
662

Alan Shaw's avatar
Alan Shaw committed
663
// PeerID returns the DHT node's Peer ID.
664 665 666 667
func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

Alan Shaw's avatar
Alan Shaw committed
668
// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
669 670 671 672
func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

Alan Shaw's avatar
Alan Shaw committed
673
// Host returns the libp2p host this DHT is operating with.
674 675 676 677
func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

Alan Shaw's avatar
Alan Shaw committed
678
// Ping sends a ping message to the passed peer and waits for a response.
679 680 681 682
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
683
		return fmt.Errorf("sending request: %w", err)
684 685
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
686
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
687 688 689
	}
	return nil
}
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}
706 707 708 709 710 711 712 713

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
	// Don't add addresses for self or our connected peers. We have better ones.
	if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
		return
	}
	dht.peerstore.AddAddrs(p, addrs, ttl)
}