table.go 15.3 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"context"
6
	"encoding/binary"
7
	"errors"
8
	"fmt"
9
	"math/rand"
10
	"sync"
11
	"time"
12

13 14 15
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
Aarsh Shah's avatar
Aarsh Shah committed
17 18
	"github.com/jbenet/goprocess"
	goprocessctx "github.com/jbenet/goprocess/context"
19
	mh "github.com/multiformats/go-multihash"
20 21
)

Jeromy's avatar
Jeromy committed
22
var log = logging.Logger("table")
23

24 25 26
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

Aarsh Shah's avatar
Aarsh Shah committed
27
// PeerSelectionFunc is the signature of a function that selects zero or more peers from the given peers
28
// based on some criteria.
Aarsh Shah's avatar
Aarsh Shah committed
29
type PeerSelectionFunc func(peers []PeerInfo) []PeerInfo
30

Aarsh Shah's avatar
Aarsh Shah committed
31 32
// PeerValidationFunc is the signature of a function that determines the validity a peer for Routing Table membership.
type PeerValidationFunc func(ctx context.Context, p peer.ID) bool
33

Aarsh Shah's avatar
Aarsh Shah committed
34 35 36
// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15
Aarsh Shah's avatar
Aarsh Shah committed
37

Aarsh Shah's avatar
Aarsh Shah committed
38 39
// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
Aarsh Shah's avatar
Aarsh Shah committed
40 41 42 43 44
type CplRefresh struct {
	Cpl           uint
	LastRefreshAt time.Time
}

45 46
// RoutingTable defines the routing table.
type RoutingTable struct {
47
	ctx context.Context
48 49 50 51 52 53
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

54
	// latency metrics
55
	metrics peerstore.Metrics
56

57 58 59
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

60
	// kBuckets define all the fingers to other nodes.
Aarsh Shah's avatar
Aarsh Shah committed
61
	buckets    []*bucket
62
	bucketsize int
Jeromy's avatar
Jeromy committed
63

Aarsh Shah's avatar
Aarsh Shah committed
64 65 66
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

67 68 69
	// replacement candidates for a Cpl
	cplReplacementCache *cplReplacementCache

Jeromy's avatar
Jeromy committed
70 71 72
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
73 74

	// function to determine the validity of a peer for RT membership
Aarsh Shah's avatar
Aarsh Shah committed
75
	peerValidationFnc PeerValidationFunc
76 77 78 79 80 81

	// timeout for a single call to the peer validation function
	peerValidationTimeout time.Duration
	// interval between two runs of the table cleanup routine
	tableCleanupInterval time.Duration
	// function to select peers that need to be validated
Aarsh Shah's avatar
Aarsh Shah committed
82
	peersForValidationFnc PeerSelectionFunc
Aarsh Shah's avatar
Aarsh Shah committed
83 84

	proc goprocess.Process
85 86
}

Chas Leichner's avatar
Chas Leichner committed
87
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Aarsh Shah's avatar
Aarsh Shah committed
88
// Passing a nil PeerValidationFunc disables periodic table cleanup.
Aarsh Shah's avatar
Aarsh Shah committed
89
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics,
Aarsh Shah's avatar
Aarsh Shah committed
90
	opts ...Option) (*RoutingTable, error) {
91

Aarsh Shah's avatar
Aarsh Shah committed
92
	var cfg options
Aarsh Shah's avatar
Aarsh Shah committed
93
	if err := cfg.Apply(append([]Option{Defaults}, opts...)...); err != nil {
94 95 96
		return nil, err
	}

Jeromy's avatar
Jeromy committed
97
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
98
		ctx:        context.Background(),
Aarsh Shah's avatar
Aarsh Shah committed
99
		buckets:    []*bucket{newBucket()},
100 101 102 103 104 105
		bucketsize: bucketsize,
		local:      localID,

		maxLatency: latency,
		metrics:    m,

Aarsh Shah's avatar
Aarsh Shah committed
106
		cplRefreshedAt: make(map[uint]time.Time),
107 108 109 110

		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},

Aarsh Shah's avatar
Aarsh Shah committed
111 112 113 114
		peerValidationFnc:     cfg.tableCleanup.peerValidationFnc,
		peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
		peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
		tableCleanupInterval:  cfg.tableCleanup.interval,
Jeromy's avatar
Jeromy committed
115 116
	}

117
	rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
Aarsh Shah's avatar
Aarsh Shah committed
118
	rt.proc = goprocessctx.WithContext(rt.ctx)
119

Aarsh Shah's avatar
Aarsh Shah committed
120 121
	// schedule periodic RT cleanup if peer validation function has been passed
	if rt.peerValidationFnc != nil {
Aarsh Shah's avatar
Aarsh Shah committed
122
		rt.proc.Go(rt.cleanup)
123 124 125 126 127
	}

	return rt, nil
}

Aarsh Shah's avatar
Aarsh Shah committed
128 129 130 131 132 133 134
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
	return rt.proc.Close()
}

func (rt *RoutingTable) cleanup(proc goprocess.Process) {
135 136 137
	validatePeerF := func(p peer.ID) bool {
		queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
		defer cancel()
Aarsh Shah's avatar
Aarsh Shah committed
138
		return rt.peerValidationFnc(queryCtx, p)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
	}

	cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
	defer cleanupTickr.Stop()
	for {
		select {
		case <-cleanupTickr.C:
			ps := rt.peersToValidate()
			for _, pinfo := range ps {
				// continue if we are able to successfully validate the peer
				// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
				// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
				if validatePeerF(pinfo.Id) {
					log.Infof("successfully validated missing peer=%s", pinfo.Id)
					continue
				}

				// peer does not seem to be alive, let's try candidates now
				log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
				// evict missing peer
				rt.HandlePeerDead(pinfo.Id)

				// keep trying replacement candidates for the missing peer till we get a successful validation or
				// we run out of candidates
				cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
				c, notEmpty := rt.cplReplacementCache.pop(cpl)
				for notEmpty {
					if validatePeerF(c) {
						log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
						break
					}
					log.Infof("failed to validated candidate=%s", c)
					// remove candidate
					rt.HandlePeerDead(c)

					c, notEmpty = rt.cplReplacementCache.pop(cpl)
				}

				if !notEmpty {
					log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
				}
			}
Aarsh Shah's avatar
Aarsh Shah committed
181 182
		case <-proc.Closing():
			return
183 184 185 186 187 188 189 190 191 192
		}
	}
}

// returns the peers that need to be validated.
func (rt *RoutingTable) peersToValidate() []PeerInfo {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	var peers []PeerInfo
Aarsh Shah's avatar
Aarsh Shah committed
193
	for _, b := range rt.buckets {
194 195 196
		peers = append(peers, b.peers()...)
	}
	return rt.peersForValidationFnc(peers)
197 198
}

Aarsh Shah's avatar
Aarsh Shah committed
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
// NPeersForCPL returns the number of peers we have for a given Cpl
func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	// it's in the last bucket
	if int(cpl) >= len(rt.buckets)-1 {
		count := 0
		b := rt.buckets[len(rt.buckets)-1]
		for _, p := range b.peerIds() {
			if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) {
				count++
			}
		}
		return count
	} else {
		return rt.buckets[cpl].len()
	}
}

// IsBucketFull returns true if the Logical bucket for a given Cpl is full
func (rt *RoutingTable) IsBucketFull(cpl uint) bool {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	return rt.NPeersForCpl(cpl) == rt.bucketsize
}

Aarsh Shah's avatar
Aarsh Shah committed
227 228
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
Aarsh Shah's avatar
Aarsh Shah committed
229
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
230 231
	rt.cplRefreshLk.RLock()
	defer rt.cplRefreshLk.RUnlock()
Aarsh Shah's avatar
Aarsh Shah committed
232

Aarsh Shah's avatar
Aarsh Shah committed
233
	cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))
Aarsh Shah's avatar
Aarsh Shah committed
234 235

	for c, t := range rt.cplRefreshedAt {
Aarsh Shah's avatar
Aarsh Shah committed
236
		cpls = append(cpls, CplRefresh{c, t})
237 238
	}

Aarsh Shah's avatar
Aarsh Shah committed
239 240 241 242 243
	return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
Aarsh Shah's avatar
Aarsh Shah committed
244 245
	if targetCpl > maxCplForRefresh {
		return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
246 247
	}

248
	localPrefix := binary.BigEndian.Uint16(rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
249 250 251 252 253 254 255 256 257 258 259

	// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
	// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
	// to our randomly generated prefix.
	toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
	randPrefix := uint16(rand.Uint32())

	// Combine the toggled local prefix and the random bits at the correct offset
	// such that ONLY the first `targetCpl` bits match the local ID.
	mask := (^uint16(0)) << (16 - (targetCpl + 1))
	targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
260

261 262 263
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
264
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
265
	return peer.ID(id[:]), nil
266 267
}

Aarsh Shah's avatar
Aarsh Shah committed
268 269
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
270
	cpl := CommonPrefixLen(id, rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
271
	if uint(cpl) > maxCplForRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
272
		return
273 274
	}

Aarsh Shah's avatar
Aarsh Shah committed
275 276 277 278
	rt.cplRefreshLk.Lock()
	defer rt.cplRefreshLk.Unlock()

	rt.cplRefreshedAt[uint(cpl)] = newTime
279 280
}

281 282 283
// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer.
// This enables the Routing Table to mark the peer as missing.
func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) {
284 285
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
286 287

	bucketId := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
288 289 290
	// mark the peer as missing
	b := rt.buckets[bucketId]
	if peer := b.getPeer(p); peer != nil {
291
		peer.State = PeerStateMissing
292
	}
293 294 295 296 297 298 299 300
}

// HandlePeerAlive should be called when the caller detects that a peer is alive.
// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer.
// This enables the RT to update it's internal state to mark the peer as active.
func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
301

302
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
303 304 305 306
	bucket := rt.buckets[bucketID]
	if peer := bucket.getPeer(p); peer != nil {
		// mark the peer as active
		peer.State = PeerStateActive
307

308
		return "", nil
309 310 311 312
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
313 314 315 316
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
317
	if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
318
		bucket.pushFront(&PeerInfo{p, PeerStateActive})
319 320
		rt.PeerAdded(p)
		return "", nil
321 322
	}

Aarsh Shah's avatar
Aarsh Shah committed
323
	if bucketID == len(rt.buckets)-1 {
324 325 326
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
327
		bucketID = rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
328
		bucket = rt.buckets[bucketID]
329 330 331

		// push the peer only if the bucket isn't overflowing after slitting
		if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
332
			bucket.pushFront(&PeerInfo{p, PeerStateActive})
333 334
			rt.PeerAdded(p)
			return "", nil
335 336
		}
	}
337 338
	// try to push it as a candidate in the replacement cache
	rt.cplReplacementCache.push(p)
339 340

	return "", ErrPeerRejectedNoCapacity
341 342
}

343 344 345 346 347
// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
	// remove it as a candidate
	rt.cplReplacementCache.remove(p)
348

349
	// remove it from the RT
Steven Allen's avatar
Steven Allen committed
350 351
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
352
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
353
	bucket := rt.buckets[bucketID]
354
	if bucket.remove(p) {
355 356
		rt.PeerRemoved(p)
	}
357 358
}

359
func (rt *RoutingTable) nextBucket() {
360 361 362
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
Aarsh Shah's avatar
Aarsh Shah committed
363 364 365
	bucket := rt.buckets[len(rt.buckets)-1]
	newBucket := bucket.split(len(rt.buckets)-1, rt.local)
	rt.buckets = append(rt.buckets, newBucket)
366

367
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
368
	if newBucket.len() >= rt.bucketsize {
369 370
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
371 372 373
	}
}

Jeromy's avatar
Jeromy committed
374
// Find a specific peer by ID or return nil
375
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
376
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
377 378
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
379 380 381 382
	}
	return srch[0]
}

383
// NearestPeer returns a single peer that is nearest to the given ID
384
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
385 386 387 388
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
389

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
390
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
391
	return ""
392 393
}

394
// NearestPeers returns a list of the 'count' closest peers to the given ID
395
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
396 397 398 399
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
400
	cpl := CommonPrefixLen(id, rt.local)
401

402
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
403 404
	rt.tabLock.RLock()

Aarsh Shah's avatar
Aarsh Shah committed
405 406 407
	// Get bucket index or last bucket
	if cpl >= len(rt.buckets) {
		cpl = len(rt.buckets) - 1
408 409
	}

410
	pds := peerDistanceSorter{
411
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
412 413
		target: id,
	}
414

415
	// Add peers from the target bucket (cpl+1 shared bits).
Aarsh Shah's avatar
Aarsh Shah committed
416
	pds.appendPeersFromList(rt.buckets[cpl].list)
417

418 419 420
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
421 422 423 424 425
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
426 427
	// However, we're going to do that anyways as it's "good enough"

Aarsh Shah's avatar
Aarsh Shah committed
428 429
	for i := cpl + 1; i < len(rt.buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.buckets[i].list)
430 431
	}

432 433 434 435
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
Aarsh Shah's avatar
fix doc  
Aarsh Shah committed
436 437
	// * bucket cpl-1: cpl-1 shared bits.
	// * bucket cpl-2: cpl-2 shared bits.
438
	// ...
439
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
Aarsh Shah's avatar
Aarsh Shah committed
440
		pds.appendPeersFromList(rt.buckets[i].list)
441
	}
Jeromy's avatar
Jeromy committed
442
	rt.tabLock.RUnlock()
443 444

	// Sort by distance to local peer
445
	pds.sort()
446

447 448
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
449 450
	}

451 452
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
453
		out = append(out, p.p)
454 455 456 457 458
	}

	return out
}

459
// Size returns the total number of peers in the routing table
460 461
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
462
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
463
	for _, buck := range rt.buckets {
464
		tot += buck.len()
465
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466
	rt.tabLock.RUnlock()
467 468 469
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
470
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
471 472
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
473
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
474
	for _, buck := range rt.buckets {
475
		peers = append(peers, buck.peerIds()...)
476
	}
477
	rt.tabLock.RUnlock()
478 479
	return peers
}
480

Chas Leichner's avatar
Chas Leichner committed
481 482
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
483 484
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
485

Aarsh Shah's avatar
Aarsh Shah committed
486
	for i, b := range rt.buckets {
487 488 489
		fmt.Printf("\tbucket: %d\n", i)

		for e := b.list.Front(); e != nil; e = e.Next() {
Aarsh Shah's avatar
Aarsh Shah committed
490
			p := e.Value.(*PeerInfo).Id
491 492
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
493
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
494
	rt.tabLock.RUnlock()
495
}
496 497 498 499 500 501

// the caller is responsible for the locking
func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
	peerID := ConvertPeerID(p)
	cpl := CommonPrefixLen(peerID, rt.local)
	bucketID := cpl
Aarsh Shah's avatar
Aarsh Shah committed
502 503
	if bucketID >= len(rt.buckets) {
		bucketID = len(rt.buckets) - 1
504 505 506
	}
	return bucketID
}