table.go 14.3 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"context"
6
	"encoding/binary"
7
	"errors"
8
	"fmt"
9
	"math/rand"
10
	"sync"
11
	"time"
12

13 14 15
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
17
	mh "github.com/multiformats/go-multihash"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("table")
21

22 23 24
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

Aarsh Shah's avatar
Aarsh Shah committed
25
// PeerSelectionFunc is the signature of a function that selects zero or more peers from the given peers
26
// based on some criteria.
Aarsh Shah's avatar
Aarsh Shah committed
27
type PeerSelectionFunc func(peers []PeerInfo) []PeerInfo
28

Aarsh Shah's avatar
Aarsh Shah committed
29 30
// PeerValidationFunc is the signature of a function that determines the validity a peer for Routing Table membership.
type PeerValidationFunc func(ctx context.Context, p peer.ID) bool
31

Aarsh Shah's avatar
Aarsh Shah committed
32 33 34
// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15
Aarsh Shah's avatar
Aarsh Shah committed
35

Aarsh Shah's avatar
Aarsh Shah committed
36 37
// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
Aarsh Shah's avatar
Aarsh Shah committed
38 39 40 41 42
type CplRefresh struct {
	Cpl           uint
	LastRefreshAt time.Time
}

43 44
// RoutingTable defines the routing table.
type RoutingTable struct {
45
	// the routing table context
46
	ctx context.Context
47 48 49
	// function to cancel the RT context
	ctxCancel context.CancelFunc

50 51 52 53 54 55
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

56
	// latency metrics
57
	metrics peerstore.Metrics
58

59 60 61
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

62
	// kBuckets define all the fingers to other nodes.
Aarsh Shah's avatar
Aarsh Shah committed
63
	buckets    []*bucket
64
	bucketsize int
Jeromy's avatar
Jeromy committed
65

Aarsh Shah's avatar
Aarsh Shah committed
66 67 68
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

69 70 71
	// replacement candidates for a Cpl
	cplReplacementCache *cplReplacementCache

Jeromy's avatar
Jeromy committed
72 73 74
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
75

76 77 78 79 80
	// is peer replacement enabled ?
	isReplaceEnabled bool
	// peerReplaceCh is the channel to write a peer replacement request to
	peerReplaceCh chan peer.ID

81
	// function to determine the validity of a peer for RT membership
Aarsh Shah's avatar
Aarsh Shah committed
82
	peerValidationFnc PeerValidationFunc
83 84
	// timeout for a single call to the peer validation function
	peerValidationTimeout time.Duration
85

86 87
	// interval between two runs of the table cleanup routine
	tableCleanupInterval time.Duration
88
	// function to select peers that need to be validated during cleanup
Aarsh Shah's avatar
Aarsh Shah committed
89
	peersForValidationFnc PeerSelectionFunc
90 91
}

Chas Leichner's avatar
Chas Leichner committed
92
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Aarsh Shah's avatar
Aarsh Shah committed
93
// Passing a nil PeerValidationFunc disables periodic table cleanup.
Aarsh Shah's avatar
Aarsh Shah committed
94
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics,
Aarsh Shah's avatar
Aarsh Shah committed
95
	opts ...Option) (*RoutingTable, error) {
96

Aarsh Shah's avatar
Aarsh Shah committed
97
	var cfg options
Aarsh Shah's avatar
Aarsh Shah committed
98
	if err := cfg.apply(append([]Option{Defaults}, opts...)...); err != nil {
99 100 101
		return nil, err
	}

Jeromy's avatar
Jeromy committed
102
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
103
		buckets:    []*bucket{newBucket()},
104 105 106 107 108 109
		bucketsize: bucketsize,
		local:      localID,

		maxLatency: latency,
		metrics:    m,

Aarsh Shah's avatar
Aarsh Shah committed
110
		cplRefreshedAt: make(map[uint]time.Time),
111 112 113 114

		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},

115 116
		peerReplaceCh: make(chan peer.ID, bucketsize*2),

Aarsh Shah's avatar
Aarsh Shah committed
117 118 119 120
		peerValidationFnc:     cfg.tableCleanup.peerValidationFnc,
		peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
		peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
		tableCleanupInterval:  cfg.tableCleanup.interval,
Jeromy's avatar
Jeromy committed
121 122
	}

123 124 125 126
	// create the replacement cache
	rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize*2)

	rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
127

Aarsh Shah's avatar
Aarsh Shah committed
128
	// schedule periodic RT cleanup if peer validation function has been passed
129 130 131 132
	rt.isReplaceEnabled = (rt.peerValidationFnc != nil)
	if rt.isReplaceEnabled {
		go rt.cleanup()
		go rt.startPeerReplacement()
133 134 135 136 137
	}

	return rt, nil
}

Aarsh Shah's avatar
Aarsh Shah committed
138 139 140
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
141 142
	rt.ctxCancel()
	return nil
Aarsh Shah's avatar
Aarsh Shah committed
143 144
}

145 146 147 148 149 150
// returns the peers that need to be validated.
func (rt *RoutingTable) peersToValidate() []PeerInfo {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	var peers []PeerInfo
Aarsh Shah's avatar
Aarsh Shah committed
151
	for _, b := range rt.buckets {
152 153 154
		peers = append(peers, b.peers()...)
	}
	return rt.peersForValidationFnc(peers)
155 156
}

Aarsh Shah's avatar
Aarsh Shah committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
// NPeersForCPL returns the number of peers we have for a given Cpl
func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	// it's in the last bucket
	if int(cpl) >= len(rt.buckets)-1 {
		count := 0
		b := rt.buckets[len(rt.buckets)-1]
		for _, p := range b.peerIds() {
			if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) {
				count++
			}
		}
		return count
	} else {
		return rt.buckets[cpl].len()
	}
}

// IsBucketFull returns true if the Logical bucket for a given Cpl is full
func (rt *RoutingTable) IsBucketFull(cpl uint) bool {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	return rt.NPeersForCpl(cpl) == rt.bucketsize
}

Aarsh Shah's avatar
Aarsh Shah committed
185 186
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
Aarsh Shah's avatar
Aarsh Shah committed
187
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
188 189
	rt.cplRefreshLk.RLock()
	defer rt.cplRefreshLk.RUnlock()
Aarsh Shah's avatar
Aarsh Shah committed
190

Aarsh Shah's avatar
Aarsh Shah committed
191
	cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))
Aarsh Shah's avatar
Aarsh Shah committed
192 193

	for c, t := range rt.cplRefreshedAt {
Aarsh Shah's avatar
Aarsh Shah committed
194
		cpls = append(cpls, CplRefresh{c, t})
195 196
	}

Aarsh Shah's avatar
Aarsh Shah committed
197 198 199 200 201
	return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
Aarsh Shah's avatar
Aarsh Shah committed
202 203
	if targetCpl > maxCplForRefresh {
		return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
204 205
	}

206
	localPrefix := binary.BigEndian.Uint16(rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
207 208 209 210 211 212 213 214 215 216 217

	// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
	// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
	// to our randomly generated prefix.
	toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
	randPrefix := uint16(rand.Uint32())

	// Combine the toggled local prefix and the random bits at the correct offset
	// such that ONLY the first `targetCpl` bits match the local ID.
	mask := (^uint16(0)) << (16 - (targetCpl + 1))
	targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
218

219 220 221
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
222
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
223
	return peer.ID(id[:]), nil
224 225
}

Aarsh Shah's avatar
Aarsh Shah committed
226 227
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
228
	cpl := CommonPrefixLen(id, rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
229
	if uint(cpl) > maxCplForRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
230
		return
231 232
	}

Aarsh Shah's avatar
Aarsh Shah committed
233 234 235 236
	rt.cplRefreshLk.Lock()
	defer rt.cplRefreshLk.Unlock()

	rt.cplRefreshedAt[uint(cpl)] = newTime
237 238
}

239 240 241
// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer.
// This enables the Routing Table to mark the peer as missing.
func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) {
242 243
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
244 245

	bucketId := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
246 247 248
	// mark the peer as missing
	b := rt.buckets[bucketId]
	if peer := b.getPeer(p); peer != nil {
249
		peer.State = PeerStateMissing
250
	}
251 252 253 254 255 256 257 258
}

// HandlePeerAlive should be called when the caller detects that a peer is alive.
// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer.
// This enables the RT to update it's internal state to mark the peer as active.
func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
259

260 261 262 263 264
	return rt.addPeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID) (evicted peer.ID, err error) {
265
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
266 267 268 269
	bucket := rt.buckets[bucketID]
	if peer := bucket.getPeer(p); peer != nil {
		// mark the peer as active
		peer.State = PeerStateActive
270

271
		return "", nil
272 273 274 275
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
276 277 278 279
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
280
	if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
281
		bucket.pushFront(&PeerInfo{p, PeerStateActive})
282 283
		rt.PeerAdded(p)
		return "", nil
284 285
	}

Aarsh Shah's avatar
Aarsh Shah committed
286
	if bucketID == len(rt.buckets)-1 {
287 288 289
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
290
		bucketID = rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
291
		bucket = rt.buckets[bucketID]
292 293 294

		// push the peer only if the bucket isn't overflowing after slitting
		if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
295
			bucket.pushFront(&PeerInfo{p, PeerStateActive})
296 297
			rt.PeerAdded(p)
			return "", nil
298 299
		}
	}
300 301
	// try to push it as a candidate in the replacement cache
	rt.cplReplacementCache.push(p)
302 303

	return "", ErrPeerRejectedNoCapacity
304 305
}

306
// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
307 308
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
309
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
Steven Allen's avatar
Steven Allen committed
310 311
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
312 313 314 315 316
	rt.removePeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) removePeer(p peer.ID) {
317
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
318
	bucket := rt.buckets[bucketID]
319
	if bucket.remove(p) {
320 321 322 323 324 325 326 327 328 329
		// request a replacement
		if rt.isReplaceEnabled {
			select {
			case rt.peerReplaceCh <- p:
			default:
				log.Errorf("unable to request replacement for peer=%s as queue for replace requests is full", p)
			}
		}

		// peer removed callback
330 331
		rt.PeerRemoved(p)
	}
332 333
}

334
func (rt *RoutingTable) nextBucket() {
335 336 337
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
Aarsh Shah's avatar
Aarsh Shah committed
338 339 340
	bucket := rt.buckets[len(rt.buckets)-1]
	newBucket := bucket.split(len(rt.buckets)-1, rt.local)
	rt.buckets = append(rt.buckets, newBucket)
341

342
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
343
	if newBucket.len() >= rt.bucketsize {
344 345
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
346 347 348
	}
}

Jeromy's avatar
Jeromy committed
349
// Find a specific peer by ID or return nil
350
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
351
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
352 353
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
354 355 356 357
	}
	return srch[0]
}

358
// NearestPeer returns a single peer that is nearest to the given ID
359
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
360 361 362 363
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
364

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
365
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
366
	return ""
367 368
}

369
// NearestPeers returns a list of the 'count' closest peers to the given ID
370
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
371 372 373 374
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
375
	cpl := CommonPrefixLen(id, rt.local)
376

377
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
378 379
	rt.tabLock.RLock()

Aarsh Shah's avatar
Aarsh Shah committed
380 381 382
	// Get bucket index or last bucket
	if cpl >= len(rt.buckets) {
		cpl = len(rt.buckets) - 1
383 384
	}

385
	pds := peerDistanceSorter{
386
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
387 388
		target: id,
	}
389

390
	// Add peers from the target bucket (cpl+1 shared bits).
Aarsh Shah's avatar
Aarsh Shah committed
391
	pds.appendPeersFromList(rt.buckets[cpl].list)
392

393 394 395
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
396 397 398 399 400
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
401 402
	// However, we're going to do that anyways as it's "good enough"

Aarsh Shah's avatar
Aarsh Shah committed
403 404
	for i := cpl + 1; i < len(rt.buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.buckets[i].list)
405 406
	}

407 408 409 410
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
Aarsh Shah's avatar
fix doc  
Aarsh Shah committed
411 412
	// * bucket cpl-1: cpl-1 shared bits.
	// * bucket cpl-2: cpl-2 shared bits.
413
	// ...
414
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
Aarsh Shah's avatar
Aarsh Shah committed
415
		pds.appendPeersFromList(rt.buckets[i].list)
416
	}
Jeromy's avatar
Jeromy committed
417
	rt.tabLock.RUnlock()
418 419

	// Sort by distance to local peer
420
	pds.sort()
421

422 423
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
424 425
	}

426 427
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
428
		out = append(out, p.p)
429 430 431 432 433
	}

	return out
}

434
// Size returns the total number of peers in the routing table
435 436
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
438
	for _, buck := range rt.buckets {
439
		tot += buck.len()
440
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
441
	rt.tabLock.RUnlock()
442 443 444
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
445
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
446
func (rt *RoutingTable) ListPeers() []peer.ID {
447
	rt.tabLock.RLock()
448 449 450
	defer rt.tabLock.RUnlock()

	var peers []peer.ID
Aarsh Shah's avatar
Aarsh Shah committed
451
	for _, buck := range rt.buckets {
452
		peers = append(peers, buck.peerIds()...)
453 454 455
	}
	return peers
}
456

Chas Leichner's avatar
Chas Leichner committed
457 458
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
459 460
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
461

Aarsh Shah's avatar
Aarsh Shah committed
462
	for i, b := range rt.buckets {
463 464 465
		fmt.Printf("\tbucket: %d\n", i)

		for e := b.list.Front(); e != nil; e = e.Next() {
Aarsh Shah's avatar
Aarsh Shah committed
466
			p := e.Value.(*PeerInfo).Id
467 468
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
469
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
470
	rt.tabLock.RUnlock()
471
}
472 473 474 475 476 477

// the caller is responsible for the locking
func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
	peerID := ConvertPeerID(p)
	cpl := CommonPrefixLen(peerID, rt.local)
	bucketID := cpl
Aarsh Shah's avatar
Aarsh Shah committed
478 479
	if bucketID >= len(rt.buckets) {
		bucketID = len(rt.buckets) - 1
480 481 482
	}
	return bucketID
}