table.go 15.5 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"context"
6
	"errors"
7
	"fmt"
8
	"sync"
9
	"time"
10

11 12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"

14 15
	"github.com/libp2p/go-libp2p-kbucket/peerdiversity"

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("table")
20

21 22 23
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

24 25
// RoutingTable defines the routing table.
type RoutingTable struct {
26
	// the routing table context
27
	ctx context.Context
28 29 30
	// function to cancel the RT context
	ctxCancel context.CancelFunc

31 32 33 34 35 36
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

37
	// latency metrics
38
	metrics peerstore.Metrics
39

40 41 42
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

43
	// kBuckets define all the fingers to other nodes.
Aarsh Shah's avatar
Aarsh Shah committed
44
	buckets    []*bucket
45
	bucketsize int
Jeromy's avatar
Jeromy committed
46

Aarsh Shah's avatar
Aarsh Shah committed
47 48 49
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

Jeromy's avatar
Jeromy committed
50 51 52
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
53

Aarsh Shah's avatar
Aarsh Shah committed
54
	// usefulnessGracePeriod is the maximum grace period we will give to a
55 56 57
	// peer in the bucket to be useful to us, failing which, we will evict
	// it to make place for a new peer if the bucket is full
	usefulnessGracePeriod time.Duration
58 59

	df *peerdiversity.Filter
60 61
}

Chas Leichner's avatar
Chas Leichner committed
62
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
63 64
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, usefulnessGracePeriod time.Duration,
	df *peerdiversity.Filter) (*RoutingTable, error) {
Jeromy's avatar
Jeromy committed
65
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
66
		buckets:    []*bucket{newBucket()},
67 68 69 70 71 72
		bucketsize: bucketsize,
		local:      localID,

		maxLatency: latency,
		metrics:    m,

Aarsh Shah's avatar
Aarsh Shah committed
73
		cplRefreshedAt: make(map[uint]time.Time),
74 75 76 77

		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},

Aarsh Shah's avatar
Aarsh Shah committed
78
		usefulnessGracePeriod: usefulnessGracePeriod,
79 80

		df: df,
Jeromy's avatar
Jeromy committed
81 82
	}

83
	rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
84 85 86 87

	return rt, nil
}

Aarsh Shah's avatar
Aarsh Shah committed
88 89 90
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
91 92
	rt.ctxCancel()
	return nil
Aarsh Shah's avatar
Aarsh Shah committed
93 94
}

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
// NPeersForCPL returns the number of peers we have for a given Cpl
func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	// it's in the last bucket
	if int(cpl) >= len(rt.buckets)-1 {
		count := 0
		b := rt.buckets[len(rt.buckets)-1]
		for _, p := range b.peers() {
			if CommonPrefixLen(rt.local, p.dhtId) == int(cpl) {
				count++
			}
		}
		return count
	} else {
		return rt.buckets[cpl].len()
	}
}

// TryAddPeer tries to add a peer to the Routing table.
// If the peer ALREADY exists in the Routing Table and has been queried before, this call is a no-op.
// If the peer ALREADY exists in the Routing Table but hasn't been queried before, we set it's LastUsefulAt value to
// the current time. This needs to done because we don't mark peers as "Useful"(by setting the LastUsefulAt value)
// when we first connect to them.
//
Aarsh Shah's avatar
Aarsh Shah committed
121
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time.
122
// If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having
Aarsh Shah's avatar
Aarsh Shah committed
123
// no LastSuccessfulOutboundQuery.
124
//
125
//
126
// If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer
Aarsh Shah's avatar
Aarsh Shah committed
127
// whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
128 129 130 131 132 133 134
// If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity".

// It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise.
// It also returns any error that occurred while adding the peer to the Routing Table. If the error is not nil,
// the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there.
//
// A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table.
Aarsh Shah's avatar
Aarsh Shah committed
135
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
136 137
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
138

Aarsh Shah's avatar
Aarsh Shah committed
139
	return rt.addPeer(p, queryPeer, isReplaceable)
140 141 142
}

// locking is the responsibility of the caller
Aarsh Shah's avatar
Aarsh Shah committed
143
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
144
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
145
	bucket := rt.buckets[bucketID]
146 147

	now := time.Now()
Aarsh Shah's avatar
Aarsh Shah committed
148
	var lastUsefulAt time.Time
149
	if queryPeer {
150
		lastUsefulAt = now
151
	}
152

153 154
	// peer already exists in the Routing Table.
	if peer := bucket.getPeer(p); peer != nil {
155 156 157 158 159
		// if we're querying the peer first time after adding it, let's give it a
		// usefulness bump. This will ONLY happen once.
		if peer.LastUsefulAt.IsZero() && queryPeer {
			peer.LastUsefulAt = lastUsefulAt
		}
160
		return false, nil
161 162
	}

163
	// peer's latency threshold is NOT acceptable
164 165
	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
166
		return false, ErrPeerRejectedHighLatency
167 168
	}

169 170 171 172 173 174 175 176 177
	// add it to the diversity filter for now.
	// if we aren't able to find a place for the peer in the table,
	// we will simply remove it from the Filter later.
	if rt.df != nil {
		if !rt.df.TryAdd(p) {
			return false, errors.New("peer rejected by the diversity filter")
		}
	}

178
	// We have enough space in the bucket (whether spawned or grouped).
179
	if bucket.len() < rt.bucketsize {
180 181 182 183 184 185
		bucket.pushFront(&PeerInfo{
			Id:                            p,
			LastUsefulAt:                  lastUsefulAt,
			LastSuccessfulOutboundQueryAt: now,
			AddedAt:                       now,
			dhtId:                         ConvertPeerID(p),
Aarsh Shah's avatar
Aarsh Shah committed
186
			replaceable:                   isReplaceable,
187
		})
188
		rt.PeerAdded(p)
189
		return true, nil
190 191
	}

Aarsh Shah's avatar
Aarsh Shah committed
192
	if bucketID == len(rt.buckets)-1 {
193 194 195
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
196
		bucketID = rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
197
		bucket = rt.buckets[bucketID]
198 199 200

		// push the peer only if the bucket isn't overflowing after slitting
		if bucket.len() < rt.bucketsize {
201 202 203 204 205 206
			bucket.pushFront(&PeerInfo{
				Id:                            p,
				LastUsefulAt:                  lastUsefulAt,
				LastSuccessfulOutboundQueryAt: now,
				AddedAt:                       now,
				dhtId:                         ConvertPeerID(p),
Aarsh Shah's avatar
Aarsh Shah committed
207
				replaceable:                   isReplaceable,
208
			})
209
			rt.PeerAdded(p)
210 211 212 213 214
			return true, nil
		}
	}

	// the bucket to which the peer belongs is full. Let's try to find a peer
Aarsh Shah's avatar
Aarsh Shah committed
215 216 217 218 219
	// in that bucket which is replaceable.
	// we don't really need a stable sort here as it dosen't matter which peer we evict
	// as long as it's a replaceable peer.
	replaceablePeer := bucket.min(func(p1 *PeerInfo, p2 *PeerInfo) bool {
		return p1.replaceable
Aarsh Shah's avatar
Aarsh Shah committed
220 221
	})

Aarsh Shah's avatar
Aarsh Shah committed
222
	if replaceablePeer != nil && replaceablePeer.replaceable {
Aarsh Shah's avatar
Aarsh Shah committed
223
		// let's evict it and add the new peer
Aarsh Shah's avatar
Aarsh Shah committed
224
		if rt.removePeer(replaceablePeer.Id) {
225 226 227 228 229 230
			bucket.pushFront(&PeerInfo{
				Id:                            p,
				LastUsefulAt:                  lastUsefulAt,
				LastSuccessfulOutboundQueryAt: now,
				AddedAt:                       now,
				dhtId:                         ConvertPeerID(p),
Aarsh Shah's avatar
Aarsh Shah committed
231
				replaceable:                   isReplaceable,
232
			})
Aarsh Shah's avatar
Aarsh Shah committed
233 234
			rt.PeerAdded(p)
			return true, nil
235 236
		}
	}
237

238 239 240 241
	// we weren't able to find place for the peer, remove it from the filter state.
	if rt.df != nil {
		rt.df.Remove(p)
	}
242 243 244
	return false, ErrPeerRejectedNoCapacity
}

Aarsh Shah's avatar
Aarsh Shah committed
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
// MarkAllPeersIrreplaceable marks all peers in the routing table as irreplaceable
// This means that we will never replace an existing peer in the table to make space for a new peer.
// However, they can still be removed by calling the `RemovePeer` API.
func (rt *RoutingTable) MarkAllPeersIrreplaceable() {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

	for i := range rt.buckets {
		b := rt.buckets[i]
		b.updateAllWith(func(p *PeerInfo) {
			p.replaceable = false
		})
	}
}

Aarsh Shah's avatar
Aarsh Shah committed
260 261 262 263 264 265 266
// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	var pis []PeerInfo
	for _, b := range rt.buckets {
Marten Seemann's avatar
Marten Seemann committed
267
		pis = append(pis, b.peers()...)
Aarsh Shah's avatar
Aarsh Shah committed
268 269 270 271
	}
	return pis
}

Aarsh Shah's avatar
Aarsh Shah committed
272
// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQueryAt time of the peer.
273
// Returns true if the update was successful, false otherwise.
Aarsh Shah's avatar
Aarsh Shah committed
274
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQueryAt(p peer.ID, t time.Time) bool {
275 276 277 278 279 280 281
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

	bucketID := rt.bucketIdForPeer(p)
	bucket := rt.buckets[bucketID]

	if pc := bucket.getPeer(p); pc != nil {
Aarsh Shah's avatar
Aarsh Shah committed
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
		pc.LastSuccessfulOutboundQueryAt = t
		return true
	}
	return false
}

// UpdateLastUsefulAt updates the LastUsefulAt time of the peer.
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastUsefulAt(p peer.ID, t time.Time) bool {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

	bucketID := rt.bucketIdForPeer(p)
	bucket := rt.buckets[bucketID]

	if pc := bucket.getPeer(p); pc != nil {
		pc.LastUsefulAt = t
299 300 301
		return true
	}
	return false
302 303
}

304 305 306 307
// RemovePeer should be called when the caller is sure that a peer is not useful for queries.
// For eg: the peer could have stopped supporting the DHT protocol.
// It evicts the peer from the Routing Table.
func (rt *RoutingTable) RemovePeer(p peer.ID) {
Steven Allen's avatar
Steven Allen committed
308 309
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
310 311 312 313
	rt.removePeer(p)
}

// locking is the responsibility of the caller
Aarsh Shah's avatar
Aarsh Shah committed
314
func (rt *RoutingTable) removePeer(p peer.ID) bool {
315
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
316
	bucket := rt.buckets[bucketID]
317
	if bucket.remove(p) {
318 319 320
		if rt.df != nil {
			rt.df.Remove(p)
		}
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
		for {
			lastBucketIndex := len(rt.buckets) - 1

			// remove the last bucket if it's empty and it isn't the only bucket we have
			if len(rt.buckets) > 1 && rt.buckets[lastBucketIndex].len() == 0 {
				rt.buckets[lastBucketIndex] = nil
				rt.buckets = rt.buckets[:lastBucketIndex]
			} else if len(rt.buckets) >= 2 && rt.buckets[lastBucketIndex-1].len() == 0 {
				// if the second last bucket just became empty, remove and replace it with the last bucket.
				rt.buckets[lastBucketIndex-1] = rt.buckets[lastBucketIndex]
				rt.buckets[lastBucketIndex] = nil
				rt.buckets = rt.buckets[:lastBucketIndex]
			} else {
				break
			}
		}

338
		// peer removed callback
339
		rt.PeerRemoved(p)
Aarsh Shah's avatar
Aarsh Shah committed
340
		return true
341
	}
Aarsh Shah's avatar
Aarsh Shah committed
342
	return false
343 344
}

345
func (rt *RoutingTable) nextBucket() {
346 347 348
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
Aarsh Shah's avatar
Aarsh Shah committed
349 350 351
	bucket := rt.buckets[len(rt.buckets)-1]
	newBucket := bucket.split(len(rt.buckets)-1, rt.local)
	rt.buckets = append(rt.buckets, newBucket)
352

353
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
354
	if newBucket.len() >= rt.bucketsize {
355 356
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
357 358 359
	}
}

Jeromy's avatar
Jeromy committed
360
// Find a specific peer by ID or return nil
361
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
362
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
363 364
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
365 366 367 368
	}
	return srch[0]
}

369
// NearestPeer returns a single peer that is nearest to the given ID
370
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
371 372 373 374
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
375

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
376
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
377
	return ""
378 379
}

380
// NearestPeers returns a list of the 'count' closest peers to the given ID
381
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
382 383 384 385
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
386
	cpl := CommonPrefixLen(id, rt.local)
387

388
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
389 390
	rt.tabLock.RLock()

Aarsh Shah's avatar
Aarsh Shah committed
391 392 393
	// Get bucket index or last bucket
	if cpl >= len(rt.buckets) {
		cpl = len(rt.buckets) - 1
394 395
	}

396
	pds := peerDistanceSorter{
397
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
398 399
		target: id,
	}
400

401
	// Add peers from the target bucket (cpl+1 shared bits).
Aarsh Shah's avatar
Aarsh Shah committed
402
	pds.appendPeersFromList(rt.buckets[cpl].list)
403

Steven Allen's avatar
Steven Allen committed
404 405 406
	// If we're short, add peers from all buckets to the right. All buckets
	// to the right share exactly cpl bits (as opposed to the cpl+1 bits
	// shared by the peers in the cpl bucket).
407
	//
Steven Allen's avatar
Steven Allen committed
408 409 410
	// This is, unfortunately, less efficient than we'd like. We will switch
	// to a trie implementation eventually which will allow us to find the
	// closest N peers to any target key.
411

Steven Allen's avatar
Steven Allen committed
412 413 414 415
	if pds.Len() < count {
		for i := cpl + 1; i < len(rt.buckets); i++ {
			pds.appendPeersFromList(rt.buckets[i].list)
		}
416 417
	}

418 419 420 421
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
Aarsh Shah's avatar
fix doc  
Aarsh Shah committed
422 423
	// * bucket cpl-1: cpl-1 shared bits.
	// * bucket cpl-2: cpl-2 shared bits.
424
	// ...
425
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
Aarsh Shah's avatar
Aarsh Shah committed
426
		pds.appendPeersFromList(rt.buckets[i].list)
427
	}
Jeromy's avatar
Jeromy committed
428
	rt.tabLock.RUnlock()
429 430

	// Sort by distance to local peer
431
	pds.sort()
432

433 434
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
435 436
	}

437 438
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
439
		out = append(out, p.p)
440 441 442 443 444
	}

	return out
}

445
// Size returns the total number of peers in the routing table
446 447
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
448
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
449
	for _, buck := range rt.buckets {
450
		tot += buck.len()
451
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
452
	rt.tabLock.RUnlock()
453 454 455
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
456
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
457
func (rt *RoutingTable) ListPeers() []peer.ID {
458
	rt.tabLock.RLock()
459 460 461
	defer rt.tabLock.RUnlock()

	var peers []peer.ID
Aarsh Shah's avatar
Aarsh Shah committed
462
	for _, buck := range rt.buckets {
463
		peers = append(peers, buck.peerIds()...)
464 465 466
	}
	return peers
}
467

Chas Leichner's avatar
Chas Leichner committed
468 469
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
470 471
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
472

Aarsh Shah's avatar
Aarsh Shah committed
473
	for i, b := range rt.buckets {
474 475 476
		fmt.Printf("\tbucket: %d\n", i)

		for e := b.list.Front(); e != nil; e = e.Next() {
Aarsh Shah's avatar
Aarsh Shah committed
477
			p := e.Value.(*PeerInfo).Id
478 479
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
480
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
481
	rt.tabLock.RUnlock()
482
}
483

484 485 486 487 488 489 490 491 492
// GetDiversityStats returns the diversity stats for the Routing Table if a diversity Filter
// is configured.
func (rt *RoutingTable) GetDiversityStats() []peerdiversity.CplDiversityStats {
	if rt.df != nil {
		return rt.df.GetDiversityStats()
	}
	return nil
}

493 494 495 496 497
// the caller is responsible for the locking
func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
	peerID := ConvertPeerID(p)
	cpl := CommonPrefixLen(peerID, rt.local)
	bucketID := cpl
Aarsh Shah's avatar
Aarsh Shah committed
498 499
	if bucketID >= len(rt.buckets) {
		bucketID = len(rt.buckets) - 1
500 501 502
	}
	return bucketID
}
503 504 505 506 507 508 509 510 511 512 513 514 515 516

// maxCommonPrefix returns the maximum common prefix length between any peer in
// the table and the current peer.
func (rt *RoutingTable) maxCommonPrefix() uint {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	for i := len(rt.buckets) - 1; i >= 0; i-- {
		if rt.buckets[i].len() > 0 {
			return rt.buckets[i].maxCommonPrefix(rt.local)
		}
	}
	return 0
}