table.go 14.6 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"context"
6
	"encoding/binary"
7
	"errors"
8
	"fmt"
9
	"math/rand"
10
	"sync"
11
	"time"
12

13 14 15
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
17
	mh "github.com/multiformats/go-multihash"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("table")
21

22 23 24
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

25 26 27 28 29 30 31
// PeerSelectionFnc is the signature of a function that selects zero or more peers from the given peers
// based on some criteria.
type PeerSelectionFnc func(peers []PeerInfo) []PeerInfo

// PeerValidationFnc is the signature of a function that determines the validity a peer for Routing Table membership.
type PeerValidationFnc func(ctx context.Context, p peer.ID) bool

Aarsh Shah's avatar
Aarsh Shah committed
32 33 34
// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15
Aarsh Shah's avatar
Aarsh Shah committed
35

Aarsh Shah's avatar
Aarsh Shah committed
36 37
// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
Aarsh Shah's avatar
Aarsh Shah committed
38 39 40 41 42
type CplRefresh struct {
	Cpl           uint
	LastRefreshAt time.Time
}

43 44
// RoutingTable defines the routing table.
type RoutingTable struct {
45 46
	ctx context.Context

47 48 49 50 51 52
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

53
	// latency metrics
54
	metrics peerstore.Metrics
55

56 57 58
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

59
	// kBuckets define all the fingers to other nodes.
60
	Buckets    []*bucket
61
	bucketsize int
Jeromy's avatar
Jeromy committed
62

Aarsh Shah's avatar
Aarsh Shah committed
63 64 65
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

66 67 68
	// replacement candidates for a Cpl
	cplReplacementCache *cplReplacementCache

Jeromy's avatar
Jeromy committed
69 70 71
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
72 73 74 75 76 77 78 79 80 81

	// function to determine the validity of a peer for RT membership
	PeerValidationFnc PeerValidationFnc

	// timeout for a single call to the peer validation function
	peerValidationTimeout time.Duration
	// interval between two runs of the table cleanup routine
	tableCleanupInterval time.Duration
	// function to select peers that need to be validated
	peersForValidationFnc PeerSelectionFnc
82 83
}

Chas Leichner's avatar
Chas Leichner committed
84
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
85 86 87 88 89 90 91 92 93
// Passing a nil PeerValidationFnc disables periodic table cleanup.
func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics,
	peerValidationFnc PeerValidationFnc, options ...Option) (*RoutingTable, error) {

	var cfg Options
	if err := cfg.Apply(append([]Option{Defaults}, options...)...); err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
94
	rt := &RoutingTable{
95 96 97 98 99 100 101 102
		ctx:        ctx,
		Buckets:    []*bucket{newBucket()},
		bucketsize: bucketsize,
		local:      localID,

		maxLatency: latency,
		metrics:    m,

Aarsh Shah's avatar
Aarsh Shah committed
103
		cplRefreshedAt: make(map[uint]time.Time),
104 105 106 107 108 109 110 111

		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},

		PeerValidationFnc:     peerValidationFnc,
		peersForValidationFnc: cfg.TableCleanup.PeersForValidationFnc,
		peerValidationTimeout: cfg.TableCleanup.PeerValidationTimeout,
		tableCleanupInterval:  cfg.TableCleanup.Interval,
Jeromy's avatar
Jeromy committed
112 113
	}

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
	rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)

	// schedule periodic RT cleanup
	if peerValidationFnc != nil {
		go rt.cleanup()
	}

	return rt, nil
}

func (rt *RoutingTable) cleanup() {
	validatePeerF := func(p peer.ID) bool {
		queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
		defer cancel()
		return rt.PeerValidationFnc(queryCtx, p)
	}

	cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
	defer cleanupTickr.Stop()
	for {
		select {
		case <-rt.ctx.Done():
			return
		case <-cleanupTickr.C:
			ps := rt.peersToValidate()
			for _, pinfo := range ps {
				// continue if we are able to successfully validate the peer
				// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
				// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
				if validatePeerF(pinfo.Id) {
					log.Infof("successfully validated missing peer=%s", pinfo.Id)
					continue
				}

				// peer does not seem to be alive, let's try candidates now
				log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
				// evict missing peer
				rt.HandlePeerDead(pinfo.Id)

				// keep trying replacement candidates for the missing peer till we get a successful validation or
				// we run out of candidates
				cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
				c, notEmpty := rt.cplReplacementCache.pop(cpl)
				for notEmpty {
					if validatePeerF(c) {
						log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
						break
					}
					log.Infof("failed to validated candidate=%s", c)
					// remove candidate
					rt.HandlePeerDead(c)

					c, notEmpty = rt.cplReplacementCache.pop(cpl)
				}

				if !notEmpty {
					log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
				}
			}
		}
	}
}

// returns the peers that need to be validated.
func (rt *RoutingTable) peersToValidate() []PeerInfo {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	var peers []PeerInfo
	for _, b := range rt.Buckets {
		peers = append(peers, b.peers()...)
	}
	return rt.peersForValidationFnc(peers)
187 188
}

Aarsh Shah's avatar
Aarsh Shah committed
189 190
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
Aarsh Shah's avatar
Aarsh Shah committed
191
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
192 193
	rt.cplRefreshLk.RLock()
	defer rt.cplRefreshLk.RUnlock()
Aarsh Shah's avatar
Aarsh Shah committed
194

Aarsh Shah's avatar
Aarsh Shah committed
195
	cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))
Aarsh Shah's avatar
Aarsh Shah committed
196 197

	for c, t := range rt.cplRefreshedAt {
Aarsh Shah's avatar
Aarsh Shah committed
198
		cpls = append(cpls, CplRefresh{c, t})
199 200
	}

Aarsh Shah's avatar
Aarsh Shah committed
201 202 203 204 205
	return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
Aarsh Shah's avatar
Aarsh Shah committed
206 207
	if targetCpl > maxCplForRefresh {
		return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
208 209
	}

210
	localPrefix := binary.BigEndian.Uint16(rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
211 212 213 214 215 216 217 218 219 220 221

	// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
	// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
	// to our randomly generated prefix.
	toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
	randPrefix := uint16(rand.Uint32())

	// Combine the toggled local prefix and the random bits at the correct offset
	// such that ONLY the first `targetCpl` bits match the local ID.
	mask := (^uint16(0)) << (16 - (targetCpl + 1))
	targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
222

223 224 225
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
226
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
227
	return peer.ID(id[:]), nil
228 229
}

Aarsh Shah's avatar
Aarsh Shah committed
230 231
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
232
	cpl := CommonPrefixLen(id, rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
233
	if uint(cpl) > maxCplForRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
234
		return
235 236
	}

Aarsh Shah's avatar
Aarsh Shah committed
237 238 239 240
	rt.cplRefreshLk.Lock()
	defer rt.cplRefreshLk.Unlock()

	rt.cplRefreshedAt[uint(cpl)] = newTime
241 242
}

243 244 245
// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer.
// This enables the Routing Table to mark the peer as missing.
func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) {
246 247
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
248 249 250 251 252 253 254

	// mark the peer as missing
	bucketId := rt.bucketIdForPeer(p)
	b := rt.Buckets[bucketId]
	if peer, has := b.getPeer(p); has {
		peer.State = PeerStateMissing
		b.replace(peer)
255
	}
256 257 258 259 260 261 262 263
}

// HandlePeerAlive should be called when the caller detects that a peer is alive.
// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer.
// This enables the RT to update it's internal state to mark the peer as active.
func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
264

265
	bucketID := rt.bucketIdForPeer(p)
266
	bucket := rt.Buckets[bucketID]
267 268 269 270 271 272 273
	if peer, has := bucket.getPeer(p); has {
		// mark the peer as active if it was missing
		if peer.State == PeerStateMissing {
			peer.State = PeerStateActive
			bucket.replace(peer)
		}

274 275 276
		// If the peer is already in the table, move it to the front.
		// This signifies that it it "more active" and the less active nodes
		// Will as a result tend towards the back of the list
277
		bucket.moveToFront(p)
278
		return "", nil
279 280 281 282
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
283 284 285 286
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
287 288
	if bucket.len() < rt.bucketsize {
		bucket.pushFront(PeerInfo{p, PeerStateActive})
289 290
		rt.PeerAdded(p)
		return "", nil
291 292
	}

293 294 295 296
	if bucketID == len(rt.Buckets)-1 {
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
297
		bucketID = rt.bucketIdForPeer(p)
298
		bucket = rt.Buckets[bucketID]
299 300 301 302 303 304

		// push the peer only if the bucket isn't overflowing after slitting
		if bucket.len() < rt.bucketsize {
			bucket.pushFront(PeerInfo{p, PeerStateActive})
			rt.PeerAdded(p)
			return "", nil
305 306
		}
	}
307 308
	// try to push it as a candidate in the replacement cache
	rt.cplReplacementCache.push(p)
309 310

	return "", ErrPeerRejectedNoCapacity
311 312
}

313 314 315 316 317
// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
	// remove it as a candidate
	rt.cplReplacementCache.remove(p)
318

319
	// remove it from the RT
Steven Allen's avatar
Steven Allen committed
320 321
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
322
	bucketID := rt.bucketIdForPeer(p)
323
	bucket := rt.Buckets[bucketID]
324
	if bucket.remove(p) {
325 326
		rt.PeerRemoved(p)
	}
327 328
}

329
func (rt *RoutingTable) nextBucket() {
330 331 332
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
333
	bucket := rt.Buckets[len(rt.Buckets)-1]
334
	newBucket := bucket.split(len(rt.Buckets)-1, rt.local)
335 336
	rt.Buckets = append(rt.Buckets, newBucket)

337
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
338
	if newBucket.len() >= rt.bucketsize {
339 340
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
341 342 343
	}
}

Jeromy's avatar
Jeromy committed
344
// Find a specific peer by ID or return nil
345
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
346
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
347 348
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
349 350 351 352
	}
	return srch[0]
}

353
// NearestPeer returns a single peer that is nearest to the given ID
354
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
355 356 357 358
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
359

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
360
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
361
	return ""
362 363
}

364
// NearestPeers returns a list of the 'count' closest peers to the given ID
365
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
366 367 368 369
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
370
	cpl := CommonPrefixLen(id, rt.local)
371

372
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
373 374
	rt.tabLock.RLock()

375
	// getPeer bucket index or last bucket
376 377 378 379
	if cpl >= len(rt.Buckets) {
		cpl = len(rt.Buckets) - 1
	}

380
	pds := peerDistanceSorter{
381
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
382 383
		target: id,
	}
384

385 386 387
	// Add peers from the target bucket (cpl+1 shared bits).
	pds.appendPeersFromList(rt.Buckets[cpl].list)

388 389 390
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
391 392 393 394 395
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
396 397 398 399
	// However, we're going to do that anyways as it's "good enough"

	for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.Buckets[i].list)
400 401
	}

402 403 404 405
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
Aarsh Shah's avatar
fix doc  
Aarsh Shah committed
406 407
	// * bucket cpl-1: cpl-1 shared bits.
	// * bucket cpl-2: cpl-2 shared bits.
408
	// ...
409
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
410
		pds.appendPeersFromList(rt.Buckets[i].list)
411
	}
Jeromy's avatar
Jeromy committed
412
	rt.tabLock.RUnlock()
413 414

	// Sort by distance to local peer
415
	pds.sort()
416

417 418
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
419 420
	}

421 422
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
423
		out = append(out, p.p)
424 425 426 427 428
	}

	return out
}

429
// Size returns the total number of peers in the routing table
430 431
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
432
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
433
	for _, buck := range rt.Buckets {
434
		tot += buck.len()
435
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
436
	rt.tabLock.RUnlock()
437 438 439
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
440
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
441 442
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
443
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
444
	for _, buck := range rt.Buckets {
445
		peers = append(peers, buck.peerIds()...)
446
	}
447
	rt.tabLock.RUnlock()
448 449
	return peers
}
450

Chas Leichner's avatar
Chas Leichner committed
451 452
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
453 454
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
455 456 457 458 459 460 461 462 463 464

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
465
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466
	rt.tabLock.RUnlock()
467
}
468 469 470 471 472 473 474 475 476 477 478

// the caller is responsible for the locking
func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
	peerID := ConvertPeerID(p)
	cpl := CommonPrefixLen(peerID, rt.local)
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}
	return bucketID
}