dht.go 19.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
8
	"math"
9 10
	"sync"
	"time"
11

12 13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"

19
	"go.opencensus.io/tag"
20

21
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
22
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Aarsh Shah's avatar
Aarsh Shah committed
23
	"github.com/libp2p/go-libp2p-kad-dht/providers"
Aarsh Shah's avatar
Aarsh Shah committed
24 25 26
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
	recpb "github.com/libp2p/go-libp2p-record/pb"
27

Aarsh Shah's avatar
Aarsh Shah committed
28
	"github.com/gogo/protobuf/proto"
29 30
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
31
	"github.com/jbenet/goprocess"
Henrique Dias's avatar
Henrique Dias committed
32
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
33
	"github.com/multiformats/go-base32"
Adin Schmahmann's avatar
Adin Schmahmann committed
34
	"github.com/multiformats/go-multihash"
35 36
)

Matt Joiner's avatar
Matt Joiner committed
37
var logger = logging.Logger("dht")
38

Henrique Dias's avatar
Henrique Dias committed
39 40
const BaseConnMgrScore = 5

41 42 43
type mode int

const (
Steven Allen's avatar
Steven Allen committed
44 45
	modeServer mode = iota + 1
	modeClient
46 47
)

Adin Schmahmann's avatar
Adin Schmahmann committed
48 49 50 51 52
const (
	kad1 protocol.ID = "/kad/1.0.0"
	kad2 protocol.ID = "/kad/2.0.0"
)

53
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
54
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
type IpfsDHT struct {
56 57 58
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
59

60
	datastore ds.Datastore // Local data
61

62
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
63 64
	// ProviderManager stores & manages the provider records for this Dht peer.
	ProviderManager *providers.ProviderManager
65

Aarsh Shah's avatar
Aarsh Shah committed
66
	birth time.Time // When this peer started up
67

68
	Validator record.Validator
69

70 71
	ctx  context.Context
	proc goprocess.Process
72 73 74

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
75

Steven Allen's avatar
Steven Allen committed
76
	plk sync.Mutex
77

78 79
	stripedPutLocks [256]sync.Mutex

Adin Schmahmann's avatar
Adin Schmahmann committed
80 81 82 83 84
	// Primary DHT protocols - we query and respond to these protocols
	protocols []protocol.ID

	// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
	serverProtocols []protocol.ID
85

86 87 88 89
	auto   bool
	mode   mode
	modeLk sync.Mutex

90
	bucketSize int
91
	alpha      int // The concurrency parameter per path
Adin Schmahmann's avatar
Adin Schmahmann committed
92
	beta       int // The number of peers closest to a target that must have responded for a query path to terminate
93

94 95 96
	queryPeerFilter        QueryFilterFunc
	routingTablePeerFilter RouteTableFilterFunc

97 98
	autoRefresh           bool
	rtRefreshQueryTimeout time.Duration
Aarsh Shah's avatar
Aarsh Shah committed
99
	rtRefreshInterval     time.Duration
Steven Allen's avatar
Steven Allen committed
100
	triggerRtRefresh      chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
101
	triggerSelfLookup     chan chan<- error
Aarsh Shah's avatar
Aarsh Shah committed
102 103

	maxRecordAge time.Duration
104

105 106 107
	// Allows disabling dht subsystems. These should _only_ be set on
	// "forked" DHTs (e.g., DHTs with custom protocols and/or private
	// networks).
108
	enableProviders, enableValues bool
109 110 111 112 113 114 115

	// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery"
	// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
	// is full
	maxLastSuccessfulOutboundThreshold time.Duration

	fixLowPeersChan chan struct{}
116 117
}

Matt Joiner's avatar
Matt Joiner committed
118 119 120 121
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
122
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
123 124 125 126 127
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

128
// New creates a new DHT with the specified host and options.
Aarsh Shah's avatar
Aarsh Shah committed
129 130 131
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
132 133
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
	var cfg config
Adin Schmahmann's avatar
Adin Schmahmann committed
134
	if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
135 136
		return nil, err
	}
Aarsh Shah's avatar
Aarsh Shah committed
137

Adin Schmahmann's avatar
Adin Schmahmann committed
138 139
	if err := cfg.validate(); err != nil {
		return nil, err
140
	}
Aarsh Shah's avatar
Aarsh Shah committed
141 142 143 144
	dht, err := makeDHT(ctx, h, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create DHT, err=%s", err)
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
145

146
	dht.autoRefresh = cfg.routingTable.autoRefresh
Aarsh Shah's avatar
Aarsh Shah committed
147
	dht.rtRefreshInterval = cfg.routingTable.refreshInterval
148
	dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
149

150 151 152
	dht.maxRecordAge = cfg.maxRecordAge
	dht.enableProviders = cfg.enableProviders
	dht.enableValues = cfg.enableValues
Aarsh Shah's avatar
Aarsh Shah committed
153

154
	dht.Validator = cfg.validator
155

156 157
	switch cfg.mode {
	case ModeAuto:
158 159
		dht.auto = true
		dht.mode = modeClient
160
	case ModeClient:
161 162
		dht.auto = false
		dht.mode = modeClient
163
	case ModeServer:
164 165 166
		dht.auto = false
		dht.mode = modeServer
	default:
167
		return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
168 169 170 171 172
	}

	if dht.mode == modeServer {
		if err := dht.moveToServerMode(); err != nil {
			return nil, err
173
		}
174
	}
175 176 177 178 179 180 181 182 183 184

	// register for event bus and network notifications
	sn, err := newSubscriberNotifiee(dht)
	if err != nil {
		return nil, err
	}
	dht.proc.Go(sn.subscribe)
	// handle providers
	dht.proc.AddChild(dht.ProviderManager.Process())

Aarsh Shah's avatar
Aarsh Shah committed
185
	dht.startSelfLookup()
186
	dht.startRefreshing()
Aarsh Shah's avatar
Aarsh Shah committed
187 188 189 190

	// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
	// since RT membership is decoupled from connectivity
	go dht.persistRTPeersInPeerStore()
191 192 193 194

	// listens to the fix low peers chan and tries to fix the Routing Table
	dht.proc.Go(dht.fixLowPeersRoutine)

195 196
	return dht, nil
}
197

198 199 200 201
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
202
	dht, err := New(ctx, h, Datastore(dstore))
203 204 205 206 207 208 209 210 211 212 213
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
Adin Schmahmann's avatar
Adin Schmahmann committed
214
	dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
215 216 217
	if err != nil {
		panic(err)
	}
218 219 220
	return dht
}

Aarsh Shah's avatar
Aarsh Shah committed
221
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
Adin Schmahmann's avatar
Adin Schmahmann committed
222 223 224 225 226 227 228 229 230 231 232 233 234
	protocols := []protocol.ID{cfg.protocolPrefix + kad2}
	serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}

	// check if custom test protocols were set
	if len(cfg.testProtocols) > 0 {
		protocols = make([]protocol.ID, len(cfg.testProtocols))
		serverProtocols = make([]protocol.ID, len(cfg.testProtocols))
		for i, p := range cfg.testProtocols {
			protocols[i] = cfg.protocolPrefix + p
			serverProtocols[i] = cfg.protocolPrefix + p
		}
	}

235
	dht := &IpfsDHT{
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
		datastore:              cfg.datastore,
		self:                   h.ID(),
		peerstore:              h.Peerstore(),
		host:                   h,
		strmap:                 make(map[peer.ID]*messageSender),
		birth:                  time.Now(),
		protocols:              protocols,
		serverProtocols:        serverProtocols,
		bucketSize:             cfg.bucketSize,
		alpha:                  cfg.concurrency,
		beta:                   cfg.resiliency,
		triggerRtRefresh:       make(chan chan<- error),
		triggerSelfLookup:      make(chan chan<- error),
		queryPeerFilter:        cfg.queryPeerFilter,
		routingTablePeerFilter: cfg.routingTable.peerFilter,
251
		fixLowPeersChan:        make(chan struct{}),
Jeromy's avatar
Jeromy committed
252
	}
253

Aarsh Shah's avatar
Aarsh Shah committed
254 255 256 257 258 259 260
	// construct routing table
	rt, err := makeRoutingTable(dht, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
	}
	dht.routingTable = rt

261 262
	// create a DHT proc with the given context
	dht.proc = goprocessctx.WithContext(ctx)
Aarsh Shah's avatar
Aarsh Shah committed
263 264 265 266 267 268

	// create a tagged context derived from the original context
	ctxTags := dht.newContextWithLocalTags(ctx)
	// the DHT context should be done when the process is closed
	dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

269
	dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
270

Aarsh Shah's avatar
Aarsh Shah committed
271
	return dht, nil
Jeromy's avatar
Jeromy committed
272 273
}

Aarsh Shah's avatar
Aarsh Shah committed
274
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
Aarsh Shah's avatar
Aarsh Shah committed
275 276 277 278 279 280 281
	// The threshold is calculated based on the expected amount of time that should pass before we
	// query a peer as part of our refresh cycle.
	// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
	// be published soon.
	l1 := math.Log(float64(1) / float64(defaultBucketSize))                              //(Log(1/K))
	l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
	maxLastSuccessfulOutboundThreshold := l1 / l2 * float64(cfg.routingTable.refreshInterval)
Aarsh Shah's avatar
Aarsh Shah committed
282

Aarsh Shah's avatar
Aarsh Shah committed
283
	self := kb.ConvertPeerID(dht.host.ID())
Aarsh Shah's avatar
Aarsh Shah committed
284

Aarsh Shah's avatar
Aarsh Shah committed
285
	rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
Aarsh Shah's avatar
Aarsh Shah committed
286
	cmgr := dht.host.ConnManager()
Aarsh Shah's avatar
Aarsh Shah committed
287 288 289 290 291 292 293

	rt.PeerAdded = func(p peer.ID) {
		commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
		cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
	}
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
294 295 296

		// try to fix the RT
		dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
297 298 299 300
	}

	return rt, err
}
301

302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
	for {
		select {
		case <-dht.fixLowPeersChan:
		case <-proc.Closing():
			return
		}
		if dht.routingTable.Size() > minRTRefreshThreshold {
			continue
		}

		for _, p := range dht.host.Network().Peers() {
			dht.peerFound(dht.Context(), p, false)
		}

		if dht.autoRefresh {
			select {
			case dht.triggerRtRefresh <- nil:
			default:
			}
		}
	}

}

Aarsh Shah's avatar
Aarsh Shah committed
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
	tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
	defer tickr.Stop()

	for {
		select {
		case <-tickr.C:
			ps := dht.routingTable.ListPeers()
			for _, p := range ps {
				dht.peerstore.UpdateAddrs(p, peerstore.RecentlyConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
			}
		case <-dht.ctx.Done():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
347
// putValueToPeer stores the given key/value pair at the peer 'p'
348 349
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
350
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
352
	if err != nil {
Steven Allen's avatar
Steven Allen committed
353
		logger.Debugw("failed to put value to peer", "to", p, "key", loggableKeyBytes(rec.Key), "error", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
354 355
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
356

357
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Steven Allen's avatar
Steven Allen committed
358
		logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
359 360
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
361

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363 364
}

365 366
var errInvalidRecord = errors.New("received invalid record")

367 368
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
369
// NOTE: It will update the dht's peerstore with any new addresses
370
// it finds for the given peer.
371
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
372
	pmes, err := dht.getValueSingle(ctx, p, key)
373
	if err != nil {
374
		return nil, nil, err
375 376
	}

377 378 379
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

380
	if record := pmes.GetRecord(); record != nil {
381
		// Success! We were given the value
Steven Allen's avatar
Steven Allen committed
382
		logger.Debug("got value")
383

384
		// make sure record is valid.
385
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
386
		if err != nil {
Steven Allen's avatar
Steven Allen committed
387
			logger.Debug("received invalid record (discarded)")
388 389
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
390
			record = new(recpb.Record)
391
		}
392
		return record, peers, err
393
	}
394

395 396 397 398
	if len(peers) > 0 {
		return nil, peers, nil
	}

399
	return nil, nil, routing.ErrNotFound
400 401
}

402
// getValueSingle simply performs the get value RPC with the given parameters
403
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
404
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
Steven Allen's avatar
Steven Allen committed
405
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
406 407
}

408
// getLocal attempts to retrieve the value from the datastore
409
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Steven Allen's avatar
Steven Allen committed
410 411
	logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

412
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
413
	if err != nil {
Steven Allen's avatar
Steven Allen committed
414
		logger.Warnw("get local failed", "key", key, "error", err)
415 416
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
417

418
	// Double check the key. Can't hurt.
419
	if rec != nil && string(rec.GetKey()) != key {
Steven Allen's avatar
Steven Allen committed
420
		logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", key, "got", rec.GetKey())
Steven Allen's avatar
Steven Allen committed
421
		return nil, nil
422 423

	}
424
	return rec, nil
425 426
}

427
// putLocal stores the key value pair in the datastore
428
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
429 430
	data, err := proto.Marshal(rec)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
431
		logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
432 433 434
		return err
	}

435
	return dht.datastore.Put(mkDsKey(key), data)
436
}
437

Aarsh Shah's avatar
Aarsh Shah committed
438
// peerFound signals the routingTable that we've found a peer that
Aarsh Shah's avatar
Aarsh Shah committed
439 440
// might support the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
Steven Allen's avatar
Steven Allen committed
441
	logger.Debugw("peer found", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
442 443
	b, err := dht.validRTPeer(p)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
444
		logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
Aarsh Shah's avatar
Aarsh Shah committed
445
	} else if b {
Steven Allen's avatar
Steven Allen committed
446 447 448 449 450
		_, err := dht.routingTable.TryAddPeer(p, queryPeer)
		if err != nil {
			// peer not added.
			return
		}
Aarsh Shah's avatar
Aarsh Shah committed
451 452 453 454 455 456 457

		// If we discovered the peer because of a query, we need to ensure we override the "zero" lastSuccessfulOutboundQuery
		// value that must have been set in the Routing Table for this peer when it was first added during a connection.
		if queryPeer {
			dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
		}
	}
Aarsh Shah's avatar
Aarsh Shah committed
458 459
}

Aarsh Shah's avatar
Aarsh Shah committed
460
// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
Aarsh Shah's avatar
Aarsh Shah committed
461
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
Steven Allen's avatar
Steven Allen committed
462
	logger.Debugw("peer stopped dht", "peer", p)
Aarsh Shah's avatar
Aarsh Shah committed
463 464
	// A peer that does not support the DHT protocol is dead for us.
	// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Aarsh Shah's avatar
Aarsh Shah committed
465 466 467
	dht.routingTable.RemovePeer(p)

	// since we lost a peer from the RT, we should do this here
468
	dht.fixRTIfNeeded()
Aarsh Shah's avatar
Aarsh Shah committed
469 470
}

471 472 473 474
func (dht *IpfsDHT) fixRTIfNeeded() {
	select {
	case dht.fixLowPeersChan <- struct{}{}:
	default:
Aarsh Shah's avatar
Aarsh Shah committed
475
	}
476
}
Jeromy's avatar
Jeromy committed
477

Jeromy's avatar
Jeromy committed
478
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
479
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
480
	switch dht.host.Network().Connectedness(id) {
481
	case network.Connected, network.CanConnect:
482 483
		return dht.peerstore.PeerInfo(id)
	default:
484
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
485 486
	}
}
487

Jeromy's avatar
Jeromy committed
488
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
489
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
490
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
Steven Allen's avatar
Steven Allen committed
491
	return dht.sendRequest(ctx, p, pmes)
492
}
493

Adin Schmahmann's avatar
Adin Schmahmann committed
494 495
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
Steven Allen's avatar
Steven Allen committed
496
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
497 498
}

499
// nearestPeersToQuery returns the routing tables closest peers.
500
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
501
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
502 503 504
	return closer
}

Aarsh Shah's avatar
Aarsh Shah committed
505
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
506
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
507
	closer := dht.nearestPeersToQuery(pmes, count)
508 509 510

	// no node? nil
	if closer == nil {
Steven Allen's avatar
Steven Allen committed
511
		logger.Infow("no closer peers to send", from)
512 513 514
		return nil
	}

Steven Allen's avatar
Steven Allen committed
515
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
516 517 518
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
519
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
520
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
521 522
			return nil
		}
523
		// Dont send a peer back themselves
524
		if clp == from {
525 526 527
			continue
		}

Jeromy's avatar
Jeromy committed
528
		filtered = append(filtered, clp)
529 530
	}

531 532
	// ok seems like closer nodes
	return filtered
533 534
}

535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
func (dht *IpfsDHT) setMode(m mode) error {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()

	if m == dht.mode {
		return nil
	}

	switch m {
	case modeServer:
		return dht.moveToServerMode()
	case modeClient:
		return dht.moveToClientMode()
	default:
		return fmt.Errorf("unrecognized dht mode: %d", m)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
553 554 555
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
556 557
func (dht *IpfsDHT) moveToServerMode() error {
	dht.mode = modeServer
Adin Schmahmann's avatar
Adin Schmahmann committed
558
	for _, p := range dht.serverProtocols {
559 560 561 562 563
		dht.host.SetStreamHandler(p, dht.handleNewStream)
	}
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
564 565 566 567 568
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
569 570
func (dht *IpfsDHT) moveToClientMode() error {
	dht.mode = modeClient
Adin Schmahmann's avatar
Adin Schmahmann committed
571
	for _, p := range dht.serverProtocols {
572 573 574 575
		dht.host.RemoveStreamHandler(p)
	}

	pset := make(map[protocol.ID]bool)
Adin Schmahmann's avatar
Adin Schmahmann committed
576
	for _, p := range dht.serverProtocols {
577 578 579 580 581 582 583
		pset[p] = true
	}

	for _, c := range dht.host.Network().Conns() {
		for _, s := range c.GetStreams() {
			if pset[s.Protocol()] {
				if s.Stat().Direction == network.DirInbound {
Steven Allen's avatar
Steven Allen committed
584
					_ = s.Reset()
585 586 587 588 589 590 591 592 593 594 595 596 597
				}
			}
		}
	}
	return nil
}

func (dht *IpfsDHT) getMode() mode {
	dht.modeLk.Lock()
	defer dht.modeLk.Unlock()
	return dht.mode
}

598 599 600 601 602 603 604 605 606 607
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

ZhengQi's avatar
ZhengQi committed
608 609 610 611 612
// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

613 614 615 616
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
617 618 619 620

func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637

func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
638
		return fmt.Errorf("sending request: %w", err)
639 640
	}
	if resp.Type != pb.Message_PING {
Steven Allen's avatar
Steven Allen committed
641
		return fmt.Errorf("got unexpected response type: %v", resp.Type)
642 643 644
	}
	return nil
}
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}