dht.go 16.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Adin Schmahmann's avatar
Adin Schmahmann committed
8
	"math/rand"
9 10
	"sync"
	"time"
11

12 13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"

19
	"go.opencensus.io/tag"
20

21
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
22
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
23
	"github.com/libp2p/go-libp2p-kad-dht/providers"
24

Aarsh Shah's avatar
Aarsh Shah committed
25
	"github.com/gogo/protobuf/proto"
26 27
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
28
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
29
	goprocessctx "github.com/jbenet/goprocess/context"
George Antoniadis's avatar
George Antoniadis committed
30
	kb "github.com/libp2p/go-libp2p-kbucket"
Henrique Dias's avatar
Henrique Dias committed
31
	record "github.com/libp2p/go-libp2p-record"
George Antoniadis's avatar
George Antoniadis committed
32
	recpb "github.com/libp2p/go-libp2p-record/pb"
Steven Allen's avatar
Steven Allen committed
33
	"github.com/multiformats/go-base32"
Adin Schmahmann's avatar
Adin Schmahmann committed
34
	"github.com/multiformats/go-multihash"
35 36
)

Matt Joiner's avatar
Matt Joiner committed
37
var logger = logging.Logger("dht")
38

Henrique Dias's avatar
Henrique Dias committed
39 40
const BaseConnMgrScore = 5

41 42 43 44 45 46 47
type mode int

const (
	modeServer mode = 1
	modeClient      = 2
)

48
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
49
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50
type IpfsDHT struct {
51 52 53
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
54

55
	datastore ds.Datastore // Local data
56

57
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
58 59
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
60

Adin Schmahmann's avatar
Adin Schmahmann committed
61 62
	birth time.Time  // When this peer started up
	rng   *rand.Rand // Source of randomness
63

64
	Validator record.Validator
65

66 67
	ctx  context.Context
	proc goprocess.Process
68 69 70

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
71

Steven Allen's avatar
Steven Allen committed
72
	plk sync.Mutex
73

74 75
	stripedPutLocks [256]sync.Mutex

76
	protocols []protocol.ID // DHT protocols
77

78 79 80 81
	auto   bool
	mode   mode
	modeLk sync.Mutex

82
	bucketSize int
83
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
84
	d          int // Number of Disjoint Paths to query
85

86 87 88
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
	rtRefreshPeriod       time.Duration
Steven Allen's avatar
Steven Allen committed
89
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
90 91

	maxRecordAge time.Duration
92

93 94 95
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
96
	enableProviders, enableValues bool
97 98
}

Matt Joiner's avatar
Matt Joiner committed
99 100 101 102
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
103
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
104 105 106 107 108
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

109
// New creates a new DHT with the specified host and options.
110 111 112
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
	if err := cfg.Apply(append([]Option{defaults}, options...)...); err != nil {
113 114
		return nil, err
	}
115 116
	if cfg.disjointPaths == 0 {
		cfg.disjointPaths = cfg.bucketSize / 2
117 118
	}
	dht := makeDHT(ctx, h, cfg)
119 120 121
	dht.autoRefresh = cfg.routingTable.autoRefresh
	dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
122

123 124 125
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
126

127
	dht.Validator = cfg.validator
128

129 130
	switch cfg.mode {
	case ModeAuto:
131 132
		dht.auto = true
		dht.mode = modeClient
133
	case ModeClient:
134 135
		dht.auto = false
		dht.mode = modeClient
136
	case ModeServer:
137 138 139
		dht.auto = false
		dht.mode = modeServer
	default:
140
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
141 142 143 144 145
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
146
		}
147
	}
148 149 150 151 152 153 154 155 156 157

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

158
	dht.startRefreshing()
159 160
	return dht, nil
}
161

162 163 164 165
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
166
	dht, err := New(ctx, h, Datastore(dstore))
167 168 169 170 171 172 173 174 175 176 177
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
178
	dht, err := New(ctx, h, Datastore(dstore), Client(true))
179 180 181
	if err != nil {
		panic(err)
	}
182 183 184
	return dht
}

185
func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
Henrique Dias's avatar
Henrique Dias committed
186
	self := kb.ConvertPeerID(h.ID())
187
	rt := kb.NewRoutingTable(cfg.bucketSize, self, cfg.routingTable.latencyTolerance, h.Peerstore())
188
	cmgr := h.ConnManager()
189

190
	rt.PeerAdded = func(p peer.ID) {
Henrique Dias's avatar
Henrique Dias committed
191
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
Henrique Dias's avatar
Henrique Dias committed
192
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
193
	}
194

195 196 197 198
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
	}

199
	dht := &IpfsDHT{
200
		datastore:        cfg.datastore,
Aarsh Shah's avatar
Aarsh Shah committed
201 202 203 204 205
		self:             h.ID(),
		peerstore:        h.Peerstore(),
		host:             h,
		strmap:           make(map[peer.ID]*messageSender),
		birth:            time.Now(),
Adin Schmahmann's avatar
Adin Schmahmann committed
206
		rng:              rand.New(rand.NewSource(rand.Int63())),
Aarsh Shah's avatar
Aarsh Shah committed
207
		routingTable:     rt,
208 209 210 211
		protocols:        cfg.protocols,
		bucketSize:       cfg.bucketSize,
		alpha:            cfg.concurrency,
		d:                cfg.disjointPaths,
Steven Allen's avatar
Steven Allen committed
212
		triggerRtRefresh: make(chan chan<- error),
Jeromy's avatar
Jeromy committed
213
	}
214

215 216
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
217 218 219 220 221 222

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

223
	dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
224 225

	return dht
Jeromy's avatar
Jeromy committed
226 227
}

Aarsh Shah's avatar
Aarsh Shah committed
228 229 230 231
// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
232
	writeResp := func(errorChan chan error, err error) {
233 234
		select {
		case <-proc.Closing():
235
		case errorChan <- errChan:
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
		}
		close(errorChan)
	}

	for {
		select {
		case req := <-dht.rtRecoveryChan:
			if dht.routingTable.Size() == 0 {
				logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
				// TODO Call Seeder with default bootstrap peers here once #383 is merged
				if dht.routingTable.Size() > 0 {
					logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
					go writeResp(req.errorChan, nil)
				} else {
					logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
					go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
				}
			} else {
				logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id)
				go writeResp(req.errorChan, nil)
			}
		case <-proc.Closing():
			return
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
261
}*/
262

Jeromy's avatar
Jeromy committed
263
// putValueToPeer stores the given key/value pair at the peer 'p'
264 265
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
266
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
268
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
269
		logger.Debugf("putValueToPeer: %v. (peer: %s, key: %s)", err, p.Pretty(), loggableKey(string(rec.Key)))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270 271
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272

273
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Matt Joiner's avatar
Matt Joiner committed
274
		logger.Warningf("putValueToPeer: value not put correctly. (%v != %v)", pmes, rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275 276
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
277

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
279 280
}

281 282
var errInvalidRecord = errors.New("received invalid record")

283 284
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
285
// NOTE: It will update the dht's peerstore with any new addresses
286
// it finds for the given peer.
287
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
288
	pmes, err := dht.getValueSingle(ctx, p, key)
289
	if err != nil {
290
		return nil, nil, err
291 292
	}

293 294 295
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

296
	if record := pmes.GetRecord(); record != nil {
297
		// Success! We were given the value
Matt Joiner's avatar
Matt Joiner committed
298
		logger.Debug("getValueOrPeers: got value")
299

300
		// make sure record is valid.
301
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
302
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
303
			logger.Info("Received invalid record! (discarded)")
304 305
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
306
			record = new(recpb.Record)
307
		}
308
		return record, peers, err
309
	}
310

311
	if len(peers) > 0 {
Matt Joiner's avatar
Matt Joiner committed
312
		logger.Debug("getValueOrPeers: peers")
313 314 315
		return nil, peers, nil
	}

Matt Joiner's avatar
Matt Joiner committed
316
	logger.Warning("getValueOrPeers: routing.ErrNotFound")
317
	return nil, nil, routing.ErrNotFound
318 319
}

320
// getValueSingle simply performs the get value RPC with the given parameters
321 322 323 324 325 326
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
	meta := logging.LoggableMap{
		"key":  key,
		"peer": p,
	}

Matt Joiner's avatar
Matt Joiner committed
327
	eip := logger.EventBegin(ctx, "getValueSingle", meta)
ForrestWeston's avatar
ForrestWeston committed
328
	defer eip.Done()
329

330
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
331 332 333 334 335
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
336
		logger.Warningf("getValueSingle: read timeout %s %s", p.Pretty(), key)
337 338
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
339
		eip.SetError(err)
340 341
		return nil, err
	}
Jeromy's avatar
Jeromy committed
342 343
}

344
// getLocal attempts to retrieve the value from the datastore
345
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Matt Joiner's avatar
Matt Joiner committed
346
	logger.Debugf("getLocal %s", key)
347
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
348
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
349
		logger.Warningf("getLocal: %s", err)
350 351
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
352

353
	// Double check the key. Can't hurt.
354
	if rec != nil && string(rec.GetKey()) != key {
Matt Joiner's avatar
Matt Joiner committed
355
		logger.Errorf("BUG getLocal: found a DHT record that didn't match it's key: %s != %s", rec.GetKey(), key)
Steven Allen's avatar
Steven Allen committed
356
		return nil, nil
357 358

	}
359
	return rec, nil
360 361
}

362
// putLocal stores the key value pair in the datastore
363
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
Matt Joiner's avatar
Matt Joiner committed
364
	logger.Debugf("putLocal: %v %v", key, rec)
365 366
	data, err := proto.Marshal(rec)
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
367
		logger.Warningf("putLocal: %s", err)
368 369 370
		return err
	}

371
	return dht.datastore.Put(mkDsKey(key), data)
372
}
373

374
// Update signals the routingTable to Update its last-seen status
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
375
// on the given peer.
376
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
Matt Joiner's avatar
Matt Joiner committed
377
	logger.Event(ctx, "updatePeer", p)
378
	dht.routingTable.Update(p)
379
}
Jeromy's avatar
Jeromy committed
380

Jeromy's avatar
Jeromy committed
381
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
382
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
383
	switch dht.host.Network().Connectedness(id) {
384
	case network.Connected, network.CanConnect:
385 386
		return dht.peerstore.PeerInfo(id)
	default:
387
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
388 389
	}
}
390

Jeromy's avatar
Jeromy committed
391
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
392
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
Matt Joiner's avatar
Matt Joiner committed
393
	eip := logger.EventBegin(ctx, "findPeerSingle",
394 395 396 397
		logging.LoggableMap{
			"peer":   p,
			"target": id,
		})
ForrestWeston's avatar
ForrestWeston committed
398
	defer eip.Done()
399

400
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
401 402 403 404 405
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
406
		logger.Warningf("read timeout: %s %s", p.Pretty(), id)
407 408
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
409
		eip.SetError(err)
410 411
		return nil, err
	}
412
}
413

Adin Schmahmann's avatar
Adin Schmahmann committed
414 415
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key))
ForrestWeston's avatar
ForrestWeston committed
416
	defer eip.Done()
417

Adin Schmahmann's avatar
Adin Schmahmann committed
418
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
419 420 421 422 423
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Adin Schmahmann's avatar
Adin Schmahmann committed
424
		logger.Warningf("read timeout: %s %s", p.Pretty(), key)
425 426
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
427
		eip.SetError(err)
428 429
		return nil, err
	}
Jeromy's avatar
Jeromy committed
430 431
}

432
// nearestPeersToQuery returns the routing tables closest peers.
433
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
434
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
435 436 437
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
438
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
439
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
440
	closer := dht.nearestPeersToQuery(pmes, count)
441 442 443

	// no node? nil
	if closer == nil {
Matt Joiner's avatar
Matt Joiner committed
444
		logger.Warning("betterPeersToQuery: no closer peers to send:", p)
445 446 447
		return nil
	}

Steven Allen's avatar
Steven Allen committed
448
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
449 450 451
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
452
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
453
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
454 455
			return nil
		}
456
		// Dont send a peer back themselves
Jeromy's avatar
Jeromy committed
457
		if clp == p {
458 459 460
			continue
		}

Jeromy's avatar
Jeromy committed
461
		filtered = append(filtered, clp)
462 463
	}

464 465
	// ok seems like closer nodes
	return filtered
466 467
}

468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
	for _, p := range dht.protocols {
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
	for _, p := range dht.protocols {
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
	for _, p := range dht.protocols {
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
					s.Reset()
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

523 524 525 526 527 528 529 530 531 532
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

ZhengQi's avatar
ZhengQi committed
533 534 535 536 537
// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

538 539 540 541
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
542

543 544
func (dht *IpfsDHT) protocolStrs() []string {
	pstrs := make([]string, len(dht.protocols))
545 546
	for idx, proto := range dht.protocols {
		pstrs[idx] = string(proto)
547 548 549 550 551
	}

	return pstrs
}

552 553 554
func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571

func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
572
		return fmt.Errorf("sending request: %w", err)
573 574
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
575
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
576 577 578
	}
	return nil
}
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}