table.go 14.6 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"context"
6
	"encoding/binary"
7
	"errors"
8
	"fmt"
9
	"math/rand"
10
	"sync"
11
	"time"
12

13 14 15
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
17 18
	"github.com/jbenet/goprocess"
	goprocessctx "github.com/jbenet/goprocess/context"
19
	mh "github.com/multiformats/go-multihash"
20 21
)

Jeromy's avatar
Jeromy committed
22
var log = logging.Logger("table")
23

24 25 26
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

Aarsh Shah's avatar
Aarsh Shah committed
27
// PeerSelectionFunc is the signature of a function that selects zero or more peers from the given peers
28
// based on some criteria.
Aarsh Shah's avatar
Aarsh Shah committed
29
type PeerSelectionFunc func(peers []PeerInfo) []PeerInfo
30

Aarsh Shah's avatar
Aarsh Shah committed
31 32
// PeerValidationFunc is the signature of a function that determines the validity a peer for Routing Table membership.
type PeerValidationFunc func(ctx context.Context, p peer.ID) bool
33

Aarsh Shah's avatar
Aarsh Shah committed
34 35 36
// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15
Aarsh Shah's avatar
Aarsh Shah committed
37

Aarsh Shah's avatar
Aarsh Shah committed
38 39
// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
Aarsh Shah's avatar
Aarsh Shah committed
40 41 42 43 44
type CplRefresh struct {
	Cpl           uint
	LastRefreshAt time.Time
}

45 46
// RoutingTable defines the routing table.
type RoutingTable struct {
47
	ctx context.Context
48 49 50 51 52 53
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

54
	// latency metrics
55
	metrics peerstore.Metrics
56

57 58 59
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

60
	// kBuckets define all the fingers to other nodes.
Aarsh Shah's avatar
Aarsh Shah committed
61
	buckets    []*bucket
62
	bucketsize int
Jeromy's avatar
Jeromy committed
63

Aarsh Shah's avatar
Aarsh Shah committed
64 65 66
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

67 68 69
	// replacement candidates for a Cpl
	cplReplacementCache *cplReplacementCache

Jeromy's avatar
Jeromy committed
70 71 72
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
73 74

	// function to determine the validity of a peer for RT membership
Aarsh Shah's avatar
Aarsh Shah committed
75
	peerValidationFnc PeerValidationFunc
76 77 78 79 80 81

	// timeout for a single call to the peer validation function
	peerValidationTimeout time.Duration
	// interval between two runs of the table cleanup routine
	tableCleanupInterval time.Duration
	// function to select peers that need to be validated
Aarsh Shah's avatar
Aarsh Shah committed
82
	peersForValidationFnc PeerSelectionFunc
Aarsh Shah's avatar
Aarsh Shah committed
83 84

	proc goprocess.Process
85 86
}

Chas Leichner's avatar
Chas Leichner committed
87
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Aarsh Shah's avatar
Aarsh Shah committed
88
// Passing a nil PeerValidationFunc disables periodic table cleanup.
Aarsh Shah's avatar
Aarsh Shah committed
89
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics,
Aarsh Shah's avatar
Aarsh Shah committed
90
	opts ...option) (*RoutingTable, error) {
91

Aarsh Shah's avatar
Aarsh Shah committed
92 93
	var cfg options
	if err := cfg.Apply(append([]option{Defaults}, opts...)...); err != nil {
94 95 96
		return nil, err
	}

Jeromy's avatar
Jeromy committed
97
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
98
		ctx:        context.Background(),
Aarsh Shah's avatar
Aarsh Shah committed
99
		buckets:    []*bucket{newBucket()},
100 101 102 103 104 105
		bucketsize: bucketsize,
		local:      localID,

		maxLatency: latency,
		metrics:    m,

Aarsh Shah's avatar
Aarsh Shah committed
106
		cplRefreshedAt: make(map[uint]time.Time),
107 108 109 110

		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},

Aarsh Shah's avatar
Aarsh Shah committed
111 112 113 114
		peerValidationFnc:     cfg.tableCleanup.peerValidationFnc,
		peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
		peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
		tableCleanupInterval:  cfg.tableCleanup.interval,
Jeromy's avatar
Jeromy committed
115 116
	}

117
	rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
Aarsh Shah's avatar
Aarsh Shah committed
118
	rt.proc = goprocessctx.WithContext(rt.ctx)
119

Aarsh Shah's avatar
Aarsh Shah committed
120 121
	// schedule periodic RT cleanup if peer validation function has been passed
	if rt.peerValidationFnc != nil {
Aarsh Shah's avatar
Aarsh Shah committed
122
		rt.proc.Go(rt.cleanup)
123 124 125 126 127
	}

	return rt, nil
}

Aarsh Shah's avatar
Aarsh Shah committed
128 129 130 131 132 133 134
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
	return rt.proc.Close()
}

func (rt *RoutingTable) cleanup(proc goprocess.Process) {
135 136 137
	validatePeerF := func(p peer.ID) bool {
		queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
		defer cancel()
Aarsh Shah's avatar
Aarsh Shah committed
138
		return rt.peerValidationFnc(queryCtx, p)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
	}

	cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
	defer cleanupTickr.Stop()
	for {
		select {
		case <-cleanupTickr.C:
			ps := rt.peersToValidate()
			for _, pinfo := range ps {
				// continue if we are able to successfully validate the peer
				// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
				// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
				if validatePeerF(pinfo.Id) {
					log.Infof("successfully validated missing peer=%s", pinfo.Id)
					continue
				}

				// peer does not seem to be alive, let's try candidates now
				log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
				// evict missing peer
				rt.HandlePeerDead(pinfo.Id)

				// keep trying replacement candidates for the missing peer till we get a successful validation or
				// we run out of candidates
				cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
				c, notEmpty := rt.cplReplacementCache.pop(cpl)
				for notEmpty {
					if validatePeerF(c) {
						log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
						break
					}
					log.Infof("failed to validated candidate=%s", c)
					// remove candidate
					rt.HandlePeerDead(c)

					c, notEmpty = rt.cplReplacementCache.pop(cpl)
				}

				if !notEmpty {
					log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
				}
			}
Aarsh Shah's avatar
Aarsh Shah committed
181 182
		case <-proc.Closing():
			return
183 184 185 186 187 188 189 190 191 192
		}
	}
}

// returns the peers that need to be validated.
func (rt *RoutingTable) peersToValidate() []PeerInfo {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	var peers []PeerInfo
Aarsh Shah's avatar
Aarsh Shah committed
193
	for _, b := range rt.buckets {
194 195 196
		peers = append(peers, b.peers()...)
	}
	return rt.peersForValidationFnc(peers)
197 198
}

Aarsh Shah's avatar
Aarsh Shah committed
199 200
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
Aarsh Shah's avatar
Aarsh Shah committed
201
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
202 203
	rt.cplRefreshLk.RLock()
	defer rt.cplRefreshLk.RUnlock()
Aarsh Shah's avatar
Aarsh Shah committed
204

Aarsh Shah's avatar
Aarsh Shah committed
205
	cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))
Aarsh Shah's avatar
Aarsh Shah committed
206 207

	for c, t := range rt.cplRefreshedAt {
Aarsh Shah's avatar
Aarsh Shah committed
208
		cpls = append(cpls, CplRefresh{c, t})
209 210
	}

Aarsh Shah's avatar
Aarsh Shah committed
211 212 213 214 215
	return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
Aarsh Shah's avatar
Aarsh Shah committed
216 217
	if targetCpl > maxCplForRefresh {
		return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
218 219
	}

220
	localPrefix := binary.BigEndian.Uint16(rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
221 222 223 224 225 226 227 228 229 230 231

	// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
	// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
	// to our randomly generated prefix.
	toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
	randPrefix := uint16(rand.Uint32())

	// Combine the toggled local prefix and the random bits at the correct offset
	// such that ONLY the first `targetCpl` bits match the local ID.
	mask := (^uint16(0)) << (16 - (targetCpl + 1))
	targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
232

233 234 235
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
236
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
237
	return peer.ID(id[:]), nil
238 239
}

Aarsh Shah's avatar
Aarsh Shah committed
240 241
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
242
	cpl := CommonPrefixLen(id, rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
243
	if uint(cpl) > maxCplForRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
244
		return
245 246
	}

Aarsh Shah's avatar
Aarsh Shah committed
247 248 249 250
	rt.cplRefreshLk.Lock()
	defer rt.cplRefreshLk.Unlock()

	rt.cplRefreshedAt[uint(cpl)] = newTime
251 252
}

253 254 255
// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer.
// This enables the Routing Table to mark the peer as missing.
func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) {
256 257
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
258 259

	bucketId := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
260 261 262
	// mark the peer as missing
	b := rt.buckets[bucketId]
	if peer := b.getPeer(p); peer != nil {
263
		peer.State = PeerStateMissing
264
	}
265 266 267 268 269 270 271 272
}

// HandlePeerAlive should be called when the caller detects that a peer is alive.
// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer.
// This enables the RT to update it's internal state to mark the peer as active.
func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
273

274
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
275 276 277 278
	bucket := rt.buckets[bucketID]
	if peer := bucket.getPeer(p); peer != nil {
		// mark the peer as active
		peer.State = PeerStateActive
279

280
		return "", nil
281 282 283 284
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
285 286 287 288
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
289
	if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
290
		bucket.pushFront(&PeerInfo{p, PeerStateActive})
291 292
		rt.PeerAdded(p)
		return "", nil
293 294
	}

Aarsh Shah's avatar
Aarsh Shah committed
295
	if bucketID == len(rt.buckets)-1 {
296 297 298
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
299
		bucketID = rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
300
		bucket = rt.buckets[bucketID]
301 302 303

		// push the peer only if the bucket isn't overflowing after slitting
		if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
304
			bucket.pushFront(&PeerInfo{p, PeerStateActive})
305 306
			rt.PeerAdded(p)
			return "", nil
307 308
		}
	}
309 310
	// try to push it as a candidate in the replacement cache
	rt.cplReplacementCache.push(p)
311 312

	return "", ErrPeerRejectedNoCapacity
313 314
}

315 316 317 318 319
// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
	// remove it as a candidate
	rt.cplReplacementCache.remove(p)
320

321
	// remove it from the RT
Steven Allen's avatar
Steven Allen committed
322 323
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
324
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
325
	bucket := rt.buckets[bucketID]
326
	if bucket.remove(p) {
327 328
		rt.PeerRemoved(p)
	}
329 330
}

331
func (rt *RoutingTable) nextBucket() {
332 333 334
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
Aarsh Shah's avatar
Aarsh Shah committed
335 336 337
	bucket := rt.buckets[len(rt.buckets)-1]
	newBucket := bucket.split(len(rt.buckets)-1, rt.local)
	rt.buckets = append(rt.buckets, newBucket)
338

339
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
340
	if newBucket.len() >= rt.bucketsize {
341 342
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
343 344 345
	}
}

Jeromy's avatar
Jeromy committed
346
// Find a specific peer by ID or return nil
347
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
348
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
349 350
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
351 352 353 354
	}
	return srch[0]
}

355
// NearestPeer returns a single peer that is nearest to the given ID
356
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
357 358 359 360
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
361

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
363
	return ""
364 365
}

366
// NearestPeers returns a list of the 'count' closest peers to the given ID
367
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
368 369 370 371
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
372
	cpl := CommonPrefixLen(id, rt.local)
373

374
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
375 376
	rt.tabLock.RLock()

Aarsh Shah's avatar
Aarsh Shah committed
377 378 379
	// Get bucket index or last bucket
	if cpl >= len(rt.buckets) {
		cpl = len(rt.buckets) - 1
380 381
	}

382
	pds := peerDistanceSorter{
383
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
384 385
		target: id,
	}
386

387
	// Add peers from the target bucket (cpl+1 shared bits).
Aarsh Shah's avatar
Aarsh Shah committed
388
	pds.appendPeersFromList(rt.buckets[cpl].list)
389

390 391 392
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
393 394 395 396 397
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
398 399
	// However, we're going to do that anyways as it's "good enough"

Aarsh Shah's avatar
Aarsh Shah committed
400 401
	for i := cpl + 1; i < len(rt.buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.buckets[i].list)
402 403
	}

404 405 406 407
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
Aarsh Shah's avatar
fix doc  
Aarsh Shah committed
408 409
	// * bucket cpl-1: cpl-1 shared bits.
	// * bucket cpl-2: cpl-2 shared bits.
410
	// ...
411
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
Aarsh Shah's avatar
Aarsh Shah committed
412
		pds.appendPeersFromList(rt.buckets[i].list)
413
	}
Jeromy's avatar
Jeromy committed
414
	rt.tabLock.RUnlock()
415 416

	// Sort by distance to local peer
417
	pds.sort()
418

419 420
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
421 422
	}

423 424
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
425
		out = append(out, p.p)
426 427 428 429 430
	}

	return out
}

431
// Size returns the total number of peers in the routing table
432 433
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
434
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
435
	for _, buck := range rt.buckets {
436
		tot += buck.len()
437
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438
	rt.tabLock.RUnlock()
439 440 441
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
442
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
443 444
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
445
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
446
	for _, buck := range rt.buckets {
447
		peers = append(peers, buck.peerIds()...)
448
	}
449
	rt.tabLock.RUnlock()
450 451
	return peers
}
452

Chas Leichner's avatar
Chas Leichner committed
453 454
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
455 456
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
457

Aarsh Shah's avatar
Aarsh Shah committed
458
	for i, b := range rt.buckets {
459 460 461 462 463 464
		fmt.Printf("\tbucket: %d\n", i)

		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
465
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466
	rt.tabLock.RUnlock()
467
}
468 469 470 471 472 473

// the caller is responsible for the locking
func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
	peerID := ConvertPeerID(p)
	cpl := CommonPrefixLen(peerID, rt.local)
	bucketID := cpl
Aarsh Shah's avatar
Aarsh Shah committed
474 475
	if bucketID >= len(rt.buckets) {
		bucketID = len(rt.buckets) - 1
476 477 478
	}
	return bucketID
}