dht.go 14.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"sync"
	"time"
10

11
	"github.com/google/uuid"
12 13 14 15 16 17 18
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/protocol"
	"github.com/libp2p/go-libp2p-core/routing"

19
	"go.opencensus.io/tag"
20 21
	"golang.org/x/xerrors"

22
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
23 24
	opts "github.com/libp2p/go-libp2p-kad-dht/opts"
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Steven Allen's avatar
Steven Allen committed
25
	providers "github.com/libp2p/go-libp2p-kad-dht/providers"
26

Steven Allen's avatar
Steven Allen committed
27 28
	proto "github.com/gogo/protobuf/proto"
	cid "github.com/ipfs/go-cid"
29 30
	ds "github.com/ipfs/go-datastore"
	logging "github.com/ipfs/go-log"
Steven Allen's avatar
Steven Allen committed
31 32
	goprocess "github.com/jbenet/goprocess"
	goprocessctx "github.com/jbenet/goprocess/context"
George Antoniadis's avatar
George Antoniadis committed
33
	kb "github.com/libp2p/go-libp2p-kbucket"
Steven Allen's avatar
Steven Allen committed
34
	record "github.com/libp2p/go-libp2p-record"
George Antoniadis's avatar
George Antoniadis committed
35
	recpb "github.com/libp2p/go-libp2p-record/pb"
Steven Allen's avatar
Steven Allen committed
36
	base32 "github.com/whyrusleeping/base32"
37 38
)

Matt Joiner's avatar
Matt Joiner committed
39
var logger = logging.Logger("dht")
40

41
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
42
// It is used to implement the base Routing module.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
type IpfsDHT struct {
44 45 46
	host      host.Host           // the network services we need
	self      peer.ID             // Local peer (yourself)
	peerstore peerstore.Peerstore // Peer Registry
47

48
	datastore ds.Datastore // Local data
49

50
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
51
	providers    *providers.ProviderManager
52

Jeromy's avatar
Jeromy committed
53
	birth time.Time // When this peer started up
54

55
	Validator record.Validator
56

57 58
	ctx  context.Context
	proc goprocess.Process
59 60 61

	strmap map[peer.ID]*messageSender
	smlk   sync.Mutex
62

Steven Allen's avatar
Steven Allen committed
63
	plk sync.Mutex
64

65 66
	stripedPutLocks [256]sync.Mutex

67
	protocols []protocol.ID // DHT protocols
68 69

	bucketSize int
70 71 72 73

	bootstrapCfg opts.BootstrapConfig

	rtRecoveryChan chan *rtRecoveryReq
74 75
}

Matt Joiner's avatar
Matt Joiner committed
76 77 78 79
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
	_ routing.ContentRouting = (*IpfsDHT)(nil)
80
	_ routing.Routing        = (*IpfsDHT)(nil)
Matt Joiner's avatar
Matt Joiner committed
81 82 83 84 85
	_ routing.PeerRouting    = (*IpfsDHT)(nil)
	_ routing.PubKeyFetcher  = (*IpfsDHT)(nil)
	_ routing.ValueStore     = (*IpfsDHT)(nil)
)

86 87 88 89 90 91 92 93 94
type rtRecoveryReq struct {
	id        string
	errorChan chan error
}

func mkRtRecoveryReq() *rtRecoveryReq {
	return &rtRecoveryReq{uuid.New().String(), make(chan error, 1)}
}

95 96 97
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
	var cfg opts.Options
98
	cfg.BucketSize = KValue
99 100 101
	if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
		return nil, err
	}
102
	dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
103
	dht.bootstrapCfg = cfg.BootstrapConfig
104 105 106 107 108 109 110 111 112 113 114

	// register for network notifs.
	dht.host.Network().Notify((*netNotifiee)(dht))

	dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
		// remove ourselves from network notifs.
		dht.host.Network().StopNotify((*netNotifiee)(dht))
		return nil
	})

	dht.proc.AddChild(dht.providers.Process())
115 116
	dht.Validator = cfg.Validator

117 118 119 120 121
	// RT recovery proc
	rtRecoveryProc := goprocessctx.WithContext(ctx)
	rtRecoveryProc.Go(dht.rtRecovery)
	dht.proc.AddChild(rtRecoveryProc)

122
	if !cfg.Client {
123 124 125
		for _, p := range cfg.Protocols {
			h.SetStreamHandler(p, dht.handleNewStream)
		}
126 127 128
	}
	return dht, nil
}
129

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
	dht, err := New(ctx, h, opts.Datastore(dstore))
	if err != nil {
		panic(err)
	}
	return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
	dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
	if err != nil {
		panic(err)
	}
150 151 152
	return dht
}

153 154
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
	rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())
155
	rtRecoveryChan := make(chan *rtRecoveryReq)
156 157

	cmgr := h.ConnManager()
158

159 160 161
	rt.PeerAdded = func(p peer.ID) {
		cmgr.TagPeer(p, "kbucket", 5)
	}
162

163 164
	rt.PeerRemoved = func(p peer.ID) {
		cmgr.UntagPeer(p, "kbucket")
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
		go func(rtRecoveryChan chan *rtRecoveryReq) {
			if rt.Size() == 0 {
				req := mkRtRecoveryReq()
				logger.Warningf("rt peer removed notification: RT is empty, will attempt to initiate recovery, reqID=%s", req.id)
				select {
				case <-ctx.Done():
					return
				case rtRecoveryChan <- req:
					select {
					case <-ctx.Done():
						return
					case <-req.errorChan:
						// TODO Do we need to do anything here ?
					}
				}
			}
		}(rtRecoveryChan)
182 183
	}

184
	dht := &IpfsDHT{
185 186 187 188 189 190 191 192 193 194 195 196
		datastore:      dstore,
		self:           h.ID(),
		peerstore:      h.Peerstore(),
		host:           h,
		strmap:         make(map[peer.ID]*messageSender),
		ctx:            ctx,
		providers:      providers.NewProviderManager(ctx, h.ID(), dstore),
		birth:          time.Now(),
		routingTable:   rt,
		protocols:      protocols,
		bucketSize:     bucketSize,
		rtRecoveryChan: rtRecoveryChan,
Jeromy's avatar
Jeromy committed
197
	}
198 199 200 201

	dht.ctx = dht.newContextWithLocalTags(ctx)

	return dht
Jeromy's avatar
Jeromy committed
202 203
}

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
	writeResp := func(errorChan chan error, err error) {
		select {
		case <-proc.Closing():
		case errorChan <- err:
		}
		close(errorChan)
	}

	for {
		select {
		case req := <-dht.rtRecoveryChan:
			if dht.routingTable.Size() == 0 {
				logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
				// TODO Call Seeder with default bootstrap peers here once #383 is merged
				if dht.routingTable.Size() > 0 {
					logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
					go writeResp(req.errorChan, nil)
				} else {
					logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
					go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
				}
			} else {
				logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id)
				go writeResp(req.errorChan, nil)
			}
		case <-proc.Closing():
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
236
// putValueToPeer stores the given key/value pair at the peer 'p'
237
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238

239
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
240
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
243
		logger.Debugf("putValueToPeer: %v. (peer: %s, key: %s)", err, p.Pretty(), loggableKey(string(rec.Key)))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244 245
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246

247
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Matt Joiner's avatar
Matt Joiner committed
248
		logger.Warningf("putValueToPeer: value not put correctly. (%v != %v)", pmes, rpmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
249 250
		return errors.New("value not put correctly")
	}
gpestana's avatar
gpestana committed
251

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253 254
}

255 256
var errInvalidRecord = errors.New("received invalid record")

257 258
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
259
// NOTE: It will update the dht's peerstore with any new addresses
260
// it finds for the given peer.
261
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
262

263
	pmes, err := dht.getValueSingle(ctx, p, key)
264
	if err != nil {
265
		return nil, nil, err
266 267
	}

268 269 270
	// Perhaps we were given closer peers
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

271
	if record := pmes.GetRecord(); record != nil {
272
		// Success! We were given the value
Matt Joiner's avatar
Matt Joiner committed
273
		logger.Debug("getValueOrPeers: got value")
274

275
		// make sure record is valid.
276
		err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
277
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
278
			logger.Info("Received invalid record! (discarded)")
279 280
			// return a sentinal to signify an invalid record was received
			err = errInvalidRecord
George Antoniadis's avatar
George Antoniadis committed
281
			record = new(recpb.Record)
282
		}
283
		return record, peers, err
284
	}
285

286
	if len(peers) > 0 {
Matt Joiner's avatar
Matt Joiner committed
287
		logger.Debug("getValueOrPeers: peers")
288 289 290
		return nil, peers, nil
	}

Matt Joiner's avatar
Matt Joiner committed
291
	logger.Warning("getValueOrPeers: routing.ErrNotFound")
292
	return nil, nil, routing.ErrNotFound
293 294
}

295
// getValueSingle simply performs the get value RPC with the given parameters
296 297 298 299 300 301
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
	meta := logging.LoggableMap{
		"key":  key,
		"peer": p,
	}

Matt Joiner's avatar
Matt Joiner committed
302
	eip := logger.EventBegin(ctx, "getValueSingle", meta)
ForrestWeston's avatar
ForrestWeston committed
303
	defer eip.Done()
304

305
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
306 307 308 309 310
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
311
		logger.Warningf("getValueSingle: read timeout %s %s", p.Pretty(), key)
312 313
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
314
		eip.SetError(err)
315 316
		return nil, err
	}
Jeromy's avatar
Jeromy committed
317 318
}

319
// getLocal attempts to retrieve the value from the datastore
320
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
Matt Joiner's avatar
Matt Joiner committed
321
	logger.Debugf("getLocal %s", key)
322
	rec, err := dht.getRecordFromDatastore(mkDsKey(key))
323
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
324
		logger.Warningf("getLocal: %s", err)
325 326
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
327

328
	// Double check the key. Can't hurt.
329
	if rec != nil && string(rec.GetKey()) != key {
Matt Joiner's avatar
Matt Joiner committed
330
		logger.Errorf("BUG getLocal: found a DHT record that didn't match it's key: %s != %s", rec.GetKey(), key)
Steven Allen's avatar
Steven Allen committed
331
		return nil, nil
332 333

	}
334
	return rec, nil
335 336
}

337
// putLocal stores the key value pair in the datastore
338
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
Matt Joiner's avatar
Matt Joiner committed
339
	logger.Debugf("putLocal: %v %v", key, rec)
340 341
	data, err := proto.Marshal(rec)
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
342
		logger.Warningf("putLocal: %s", err)
343 344 345
		return err
	}

346
	return dht.datastore.Put(mkDsKey(key), data)
347
}
348

349
// Update signals the routingTable to Update its last-seen status
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350
// on the given peer.
351
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
Matt Joiner's avatar
Matt Joiner committed
352
	logger.Event(ctx, "updatePeer", p)
353
	dht.routingTable.Update(p)
354
}
Jeromy's avatar
Jeromy committed
355

Jeromy's avatar
Jeromy committed
356
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
357
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
358
	switch dht.host.Network().Connectedness(id) {
359
	case network.Connected, network.CanConnect:
360 361
		return dht.peerstore.PeerInfo(id)
	default:
362
		return peer.AddrInfo{}
Jeromy's avatar
Jeromy committed
363 364
	}
}
365

Jeromy's avatar
Jeromy committed
366
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
367
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
Matt Joiner's avatar
Matt Joiner committed
368
	eip := logger.EventBegin(ctx, "findPeerSingle",
369 370 371 372
		logging.LoggableMap{
			"peer":   p,
			"target": id,
		})
ForrestWeston's avatar
ForrestWeston committed
373
	defer eip.Done()
374

375
	pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
376 377 378 379 380
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
381
		logger.Warningf("read timeout: %s %s", p.Pretty(), id)
382 383
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
384
		eip.SetError(err)
385 386
		return nil, err
	}
387
}
388

389
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
Matt Joiner's avatar
Matt Joiner committed
390
	eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
ForrestWeston's avatar
ForrestWeston committed
391
	defer eip.Done()
392

393
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0)
394 395 396 397 398
	resp, err := dht.sendRequest(ctx, p, pmes)
	switch err {
	case nil:
		return resp, nil
	case ErrReadTimeout:
Matt Joiner's avatar
Matt Joiner committed
399
		logger.Warningf("read timeout: %s %s", p.Pretty(), key)
400 401
		fallthrough
	default:
ForrestWeston's avatar
ForrestWeston committed
402
		eip.SetError(err)
403 404
		return nil, err
	}
Jeromy's avatar
Jeromy committed
405 406
}

407
// nearestPeersToQuery returns the routing tables closest peers.
408
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
409
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
410 411 412
	return closer
}

413
// betterPeersToQuery returns nearestPeersToQuery, but if and only if closer than self.
414
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
415
	closer := dht.nearestPeersToQuery(pmes, count)
416 417 418

	// no node? nil
	if closer == nil {
Matt Joiner's avatar
Matt Joiner committed
419
		logger.Warning("betterPeersToQuery: no closer peers to send:", p)
420 421 422
		return nil
	}

Steven Allen's avatar
Steven Allen committed
423
	filtered := make([]peer.ID, 0, len(closer))
Jeromy's avatar
Jeromy committed
424 425 426
	for _, clp := range closer {

		// == to self? thats bad
Jeromy's avatar
Jeromy committed
427
		if clp == dht.self {
Matt Joiner's avatar
Matt Joiner committed
428
			logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
429 430
			return nil
		}
431
		// Dont send a peer back themselves
Jeromy's avatar
Jeromy committed
432
		if clp == p {
433 434 435
			continue
		}

Jeromy's avatar
Jeromy committed
436
		filtered = append(filtered, clp)
437 438
	}

439 440
	// ok seems like closer nodes
	return filtered
441 442
}

443 444 445 446 447 448 449 450 451 452
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
	return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
	return dht.proc
}

ZhengQi's avatar
ZhengQi committed
453 454 455 456 457
// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
	return dht.routingTable
}

458 459 460 461
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
	return dht.proc.Close()
}
462

463 464
func (dht *IpfsDHT) protocolStrs() []string {
	pstrs := make([]string, len(dht.protocols))
465 466
	for idx, proto := range dht.protocols {
		pstrs[idx] = string(proto)
467 468 469 470 471
	}

	return pstrs
}

472 473 474
func mkDsKey(s string) ds.Key {
	return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498

func (dht *IpfsDHT) PeerID() peer.ID {
	return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
	return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
	return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
	req := pb.NewMessage(pb.Message_PING, nil, 0)
	resp, err := dht.sendRequest(ctx, p, req)
	if err != nil {
		return xerrors.Errorf("sending request: %w", err)
	}
	if resp.Type != pb.Message_PING {
		return xerrors.Errorf("got unexpected response type: %v", resp.Type)
	}
	return nil
}
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
	extraTags = append(
		extraTags,
		tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
		tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
	)
	ctx, _ = tag.New(
		ctx,
		extraTags...,
	) // ignoring error as it is unrelated to the actual function of this code.
	return ctx
}