dht.go 18.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"sync"
	"time"
10

11 12 13 14 15 16 17
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"

18
	"go.opencensus.io/tag"
19

20
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
21
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
22
	"github.com/libp2p/go-libp2p-kad-dht/providers"
23

Aarsh Shah's avatar
Aarsh Shah committed
24
	"github.com/gogo/protobuf/proto"
25 26
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
27
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
28
	goprocessctx "github.com/jbenet/goprocess/context"
George Antoniadis's avatar
George Antoniadis committed
29
	kb "github.com/libp2p/go-libp2p-kbucket"
Henrique Dias's avatar
Henrique Dias committed
30
	record "github.com/libp2p/go-libp2p-record"
George Antoniadis's avatar
George Antoniadis committed
31
	recpb "github.com/libp2p/go-libp2p-record/pb"
Steven Allen's avatar
Steven Allen committed
32
	"github.com/multiformats/go-base32"
Adin Schmahmann's avatar
Adin Schmahmann committed
33
	"github.com/multiformats/go-multihash"
34 35
)

Matt Joiner's avatar
Matt Joiner committed
36
var logger = logging.Logger("dht")
Aarsh Shah's avatar
Aarsh Shah committed
37
var rtPvLogger = logging.Logger("dht/rt-validation")
38

Henrique Dias's avatar
Henrique Dias committed
39 40
const BaseConnMgrScore = 5

41 42 43 44 45 46 47
type mode int

const (
	modeServer mode = 1
	modeClient      = 2
)

Adin Schmahmann's avatar
Adin Schmahmann committed
48 49 50 51 52
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

53
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
54
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
type IpfsDHT struct {
56 57 58
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
59

60
	datastore ds.Datastore // Local data
61

62
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
63 64
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
65

Aarsh Shah's avatar
Aarsh Shah committed
66
	birth time.Time // When this peer started up
67

68
	Validator record.Validator
69

70 71
	ctx  context.Context
	proc goprocess.Process
72 73 74

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
75

Steven Allen's avatar
Steven Allen committed
76
	plk sync.Mutex
77

78 79
	stripedPutLocks [256]sync.Mutex

Adin Schmahmann's avatar
Adin Schmahmann committed
80 81 82 83 84
	// Primary DHT protocols - we query and respond to these protocols
	protocols []protocol.ID

	// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
	serverProtocols []protocol.ID
85

86 87 88 89
	auto   bool
	mode   mode
	modeLk sync.Mutex

90
	bucketSize int
91
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
92
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
93

94 95 96
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

97 98 99
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
	rtRefreshPeriod       time.Duration
Steven Allen's avatar
Steven Allen committed
100
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
101
	triggerSelfLookup     chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
102 103

	maxRecordAge time.Duration
104

105 106 107
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
108
	enableProviders, enableValues bool
109 110
}

Matt Joiner's avatar
Matt Joiner committed
111 112 113 114
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
115
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
116 117 118 119 120
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

121
// New creates a new DHT with the specified host and options.
122 123
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
124
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
125 126
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
127

Adin Schmahmann's avatar
Adin Schmahmann committed
128 129
	if err := cfg.validate(); err != nil {
		return nil, err
130
	}
Aarsh Shah's avatar
Aarsh Shah committed
131 132 133 134
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
135

136 137 138
	dht.autoRefresh = cfg.routingTable.autoRefresh
	dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
139

140 141 142
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
143

144
	dht.Validator = cfg.validator
145

146 147
	switch cfg.mode {
	case ModeAuto:
148 149
		dht.auto = true
		dht.mode = modeClient
150
	case ModeClient:
151 152
		dht.auto = false
		dht.mode = modeClient
153
	case ModeServer:
154 155 156
		dht.auto = false
		dht.mode = modeServer
	default:
157
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
158 159 160 161 162
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
163
		}
164
	}
165 166 167 168 169 170 171 172 173 174

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
175
	dht.startSelfLookup()
176
	dht.startRefreshing()
177 178
	return dht, nil
}
179

180 181 182 183
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
184
	dht, err := New(ctx, h, Datastore(dstore))
185 186 187 188 189 190 191 192 193 194 195
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
196
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
197 198 199
	if err != nil {
		panic(err)
	}
200 201 202
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
203
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
Adin Schmahmann's avatar
Adin Schmahmann committed
204 205 206 207 208 209 210 211 212 213 214 215 216
	protocols := []protocol.ID{cfg.protocolPrefix + kad2}
	serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}

	// check if custom test protocols were set
	if len(cfg.testProtocols) > 0 {
		protocols = make([]protocol.ID, len(cfg.testProtocols))
		serverProtocols = make([]protocol.ID, len(cfg.testProtocols))
		for i, p := range cfg.testProtocols {
			protocols[i] = cfg.protocolPrefix + p
			serverProtocols[i] = cfg.protocolPrefix + p
		}
	}

217
	dht := &IpfsDHT{
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		triggerRtRefresh:       make(chan chan<- error),
		triggerSelfLookup:      make(chan chan<- error),
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
Jeromy's avatar
Jeromy committed
233
	}
234

Aarsh Shah's avatar
Aarsh Shah committed
235 236 237 238 239 240 241
	// construct routing table
	rt, err := makeRoutingTable(dht, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt

242 243
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
244 245 246 247 248 249

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

250
	dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
251

Aarsh Shah's avatar
Aarsh Shah committed
252
	return dht, nil
Jeromy's avatar
Jeromy committed
253 254
}

Aarsh Shah's avatar
Aarsh Shah committed
255 256
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
257 258
	// construct the routing table with a peer validation function
	pvF := func(c context.Context, p peer.ID) bool {
Aarsh Shah's avatar
Aarsh Shah committed
259 260
		// connect should work
		if err := dht.host.Connect(c, peer.AddrInfo{ID: p}); err != nil {
Aarsh Shah's avatar
Aarsh Shah committed
261
			rtPvLogger.Infof("failed to connect to peer %s for validation, err=%s", p, err)
Aarsh Shah's avatar
Aarsh Shah committed
262
			return false
263
		}
Aarsh Shah's avatar
Aarsh Shah committed
264 265 266 267 268

		// peer should support the DHT protocol
		b, err := dht.validRTPeer(p)
		if err != nil {
			rtPvLogger.Errorf("failed to check if peer %s supports DHT protocol, err=%s", p, err)
Aarsh Shah's avatar
Aarsh Shah committed
269
			return false
Aarsh Shah's avatar
Aarsh Shah committed
270 271
		}

272
		return b && cfg.routingTable.peerFilter(dht, dht.Host().Network().ConnsToPeer(p))
273 274
	}

Aarsh Shah's avatar
Aarsh Shah committed
275 276 277
	rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
	if !(cfg.routingTable.checkInterval == 0) {
		rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval))
278
	}
Aarsh Shah's avatar
Aarsh Shah committed
279

Aarsh Shah's avatar
Aarsh Shah committed
280
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(),
Aarsh Shah's avatar
Aarsh Shah committed
281
		rtOpts...)
Aarsh Shah's avatar
Aarsh Shah committed
282
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
283 284 285 286 287 288 289 290 291 292 293

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
	}

	return rt, err
}
294

Jeromy's avatar
Jeromy committed
295
// putValueToPeer stores the given key/value pair at the peer 'p'
296 297
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
298
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
299
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
300
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
301
		logger.Debugf("putValueToPeer: %v. (peer: %s, key: %s)", err, p.Pretty(), loggableKey(string(rec.Key)))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
302 303
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
304

305
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Matt Joiner's avatar
Matt Joiner committed
306
		logger.Warningf("putValueToPeer: value not put correctly. (%v != %v)", pmes, rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
307 308
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
309

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311 312
}

313 314
var errInvalidRecord = errors.New("received invalid record")

315 316
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
317
// NOTE: It will update the dht's peerstore with any new addresses
318
// it finds for the given peer.
319
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
320
	pmes, err := dht.getValueSingle(ctx, p, key)
321
	if err != nil {
322
		return nil, nil, err
323 324
	}

325 326 327
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

328
	if record := pmes.GetRecord(); record != nil {
329
		// Success! We were given the value
Matt Joiner's avatar
Matt Joiner committed
330
		logger.Debug("getValueOrPeers: got value")
331

332
		// make sure record is valid.
333
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
334
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
335
			logger.Info("Received invalid record! (discarded)")
336 337
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
338
			record = new(recpb.Record)
339
		}
340
		return record, peers, err
341
	}
342

343
	if len(peers) > 0 {
Matt Joiner's avatar
Matt Joiner committed
344
		logger.Debug("getValueOrPeers: peers")
345 346 347
		return nil, peers, nil
	}

Matt Joiner's avatar
Matt Joiner committed
348
	logger.Warning("getValueOrPeers: routing.ErrNotFound")
349
	return nil, nil, routing.ErrNotFound
350 351
}

352
// getValueSingle simply performs the get value RPC with the given parameters
353 354 355 356 357 358
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
	meta := logging.LoggableMap{
		"key":  key,
		"peer": p,
	}

Matt Joiner's avatar
Matt Joiner committed
359
	eip := logger.EventBegin(ctx, "getValueSingle", meta)
ForrestWeston's avatar
ForrestWeston committed
360
	defer eip.Done()
361

362
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
363 364 365 366 367
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
368
		logger.Warningf("getValueSingle: read timeout %s %s", p.Pretty(), key)
369 370
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
371
		eip.SetError(err)
372 373
		return nil, err
	}
Jeromy's avatar
Jeromy committed
374 375
}

376
// getLocal attempts to retrieve the value from the datastore
377
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Matt Joiner's avatar
Matt Joiner committed
378
	logger.Debugf("getLocal %s", key)
379
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
380
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
381
		logger.Warningf("getLocal: %s", err)
382 383
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
384

385
	// Double check the key. Can't hurt.
386
	if rec != nil && string(rec.GetKey()) != key {
Matt Joiner's avatar
Matt Joiner committed
387
		logger.Errorf("BUG getLocal: found a DHT record that didn't match it's key: %s != %s", rec.GetKey(), key)
Steven Allen's avatar
Steven Allen committed
388
		return nil, nil
389 390

	}
391
	return rec, nil
392 393
}

394
// putLocal stores the key value pair in the datastore
395
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
Matt Joiner's avatar
Matt Joiner committed
396
	logger.Debugf("putLocal: %v %v", key, rec)
397 398
	data, err := proto.Marshal(rec)
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
399
		logger.Warningf("putLocal: %s", err)
400 401 402
		return err
	}

403
	return dht.datastore.Put(mkDsKey(key), data)
404
}
405

Aarsh Shah's avatar
Aarsh Shah committed
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
// peerFound signals the routingTable that we've found a peer that
// supports the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
	logger.Event(ctx, "peerFound", p)
	dht.routingTable.HandlePeerAlive(p)
}

// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
	logger.Event(ctx, "peerStoppedDHT", p)
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
	dht.routingTable.HandlePeerDead(p)
}

// peerDisconnected signals the routing table that a peer is not connected anymore.
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
	logger.Event(ctx, "peerDisconnected", p)
	dht.routingTable.HandlePeerDisconnect(p)
425
}
Jeromy's avatar
Jeromy committed
426

Jeromy's avatar
Jeromy committed
427
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
428
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
429
	switch dht.host.Network().Connectedness(id) {
430
	case network.Connected, network.CanConnect:
431 432
		return dht.peerstore.PeerInfo(id)
	default:
433
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
434 435
	}
}
436

Jeromy's avatar
Jeromy committed
437
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
438
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
Matt Joiner's avatar
Matt Joiner committed
439
	eip := logger.EventBegin(ctx, "findPeerSingle",
440 441 442 443
		logging.LoggableMap{
			"peer":   p,
			"target": id,
		})
ForrestWeston's avatar
ForrestWeston committed
444
	defer eip.Done()
445

446
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
447 448 449 450 451
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
452
		logger.Warningf("read timeout: %s %s", p.Pretty(), id)
453 454
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
455
		eip.SetError(err)
456 457
		return nil, err
	}
458
}
459

Adin Schmahmann's avatar
Adin Schmahmann committed
460 461
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key))
ForrestWeston's avatar
ForrestWeston committed
462
	defer eip.Done()
463

Adin Schmahmann's avatar
Adin Schmahmann committed
464
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
465 466 467 468 469
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Adin Schmahmann's avatar
Adin Schmahmann committed
470
		logger.Warningf("read timeout: %s %s", p.Pretty(), key)
471 472
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
473
		eip.SetError(err)
474 475
		return nil, err
	}
Jeromy's avatar
Jeromy committed
476 477
}

478
// nearestPeersToQuery returns the routing tables closest peers.
479
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
480
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
481 482 483
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
484
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
485
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
486
	closer := dht.nearestPeersToQuery(pmes, count)
487 488 489

	// no node? nil
	if closer == nil {
Matt Joiner's avatar
Matt Joiner committed
490
		logger.Warning("betterPeersToQuery: no closer peers to send:", p)
491 492 493
		return nil
	}

Steven Allen's avatar
Steven Allen committed
494
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
495 496 497
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
498
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
499
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
500 501
			return nil
		}
502
		// Dont send a peer back themselves
Jeromy's avatar
Jeromy committed
503
		if clp == p {
504 505 506
			continue
		}

Jeromy's avatar
Jeromy committed
507
		filtered = append(filtered, clp)
508 509
	}

510 511
	// ok seems like closer nodes
	return filtered
512 513
}

514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
532 533 534
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
535 536
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
537
	for _, p := range dht.serverProtocols {
538 539 540 541 542
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
543 544 545 546 547
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
548 549
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
550
	for _, p := range dht.serverProtocols {
551 552 553 554
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
555
	for _, p := range dht.serverProtocols {
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
					s.Reset()
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

577 578 579 580 581 582 583 584 585 586
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

ZhengQi's avatar
ZhengQi committed
587 588 589 590 591
// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

592 593 594 595
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
596 597 598 599

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616

func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
617
		return fmt.Errorf("sending request: %w", err)
618 619
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
620
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
621 622 623
	}
	return nil
}
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}