table.go 11.3 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"context"
6
	"errors"
7
	"fmt"
8
	"sync"
9
	"time"
10

11 12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"

George Antoniadis's avatar
George Antoniadis committed
14
	logging "github.com/ipfs/go-log"
15 16
)

Jeromy's avatar
Jeromy committed
17
var log = logging.Logger("table")
18

19 20 21
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

22 23
// RoutingTable defines the routing table.
type RoutingTable struct {
24
	// the routing table context
25
	ctx context.Context
26 27 28
	// function to cancel the RT context
	ctxCancel context.CancelFunc

29 30 31 32 33 34
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

35
	// latency metrics
36
	metrics peerstore.Metrics
37

38 39 40
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

41
	// kBuckets define all the fingers to other nodes.
Aarsh Shah's avatar
Aarsh Shah committed
42
	buckets    []*bucket
43
	bucketsize int
Jeromy's avatar
Jeromy committed
44

Aarsh Shah's avatar
Aarsh Shah committed
45 46 47
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

Jeromy's avatar
Jeromy committed
48 49 50
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
51

Aarsh Shah's avatar
Aarsh Shah committed
52
	// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery"
53 54 55
	// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
	// is full
	maxLastSuccessfulOutboundThreshold float64
56 57
}

Chas Leichner's avatar
Chas Leichner committed
58
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Aarsh Shah's avatar
Aarsh Shah committed
59
// Passing a nil PeerValidationFunc disables periodic table cleanup.
60
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastSuccessfulOutboundThreshold float64) (*RoutingTable, error) {
Jeromy's avatar
Jeromy committed
61
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
62
		buckets:    []*bucket{newBucket()},
63 64 65 66 67 68
		bucketsize: bucketsize,
		local:      localID,

		maxLatency: latency,
		metrics:    m,

Aarsh Shah's avatar
Aarsh Shah committed
69
		cplRefreshedAt: make(map[uint]time.Time),
70 71 72 73

		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},

74
		maxLastSuccessfulOutboundThreshold: maxLastSuccessfulOutboundThreshold,
Jeromy's avatar
Jeromy committed
75 76
	}

77
	rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
78 79 80 81

	return rt, nil
}

Aarsh Shah's avatar
Aarsh Shah committed
82 83 84
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
85 86
	rt.ctxCancel()
	return nil
Aarsh Shah's avatar
Aarsh Shah committed
87 88
}

89
// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op.
Aarsh Shah's avatar
Aarsh Shah committed
90
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time.
91
// If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having
Aarsh Shah's avatar
Aarsh Shah committed
92
// no LastSuccessfulOutboundQuery.
93 94
//
// If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer
Aarsh Shah's avatar
Aarsh Shah committed
95
// whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
96 97 98 99 100 101 102 103
// If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity".

// It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise.
// It also returns any error that occurred while adding the peer to the Routing Table. If the error is not nil,
// the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there.
//
// A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table.
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) {
104 105
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
106

107
	return rt.addPeer(p, queryPeer)
108 109 110
}

// locking is the responsibility of the caller
111
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
112
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
113
	bucket := rt.buckets[bucketID]
114 115 116 117
	var lastSuccessfulOutboundQuery time.Time
	if queryPeer {
		lastSuccessfulOutboundQuery = time.Now()
	}
118

119 120 121
	// peer already exists in the Routing Table.
	if peer := bucket.getPeer(p); peer != nil {
		return false, nil
122 123
	}

124
	// peer's latency threshold is NOT acceptable
125 126
	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
127
		return false, ErrPeerRejectedHighLatency
128 129 130
	}

	// We have enough space in the bucket (whether spawned or grouped).
131
	if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
132
		bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
133
		rt.PeerAdded(p)
134
		return true, nil
135 136
	}

Aarsh Shah's avatar
Aarsh Shah committed
137
	if bucketID == len(rt.buckets)-1 {
138 139 140
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
141
		bucketID = rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
142
		bucket = rt.buckets[bucketID]
143 144 145

		// push the peer only if the bucket isn't overflowing after slitting
		if bucket.len() < rt.bucketsize {
Aarsh Shah's avatar
Aarsh Shah committed
146
			bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
147
			rt.PeerAdded(p)
148 149 150 151 152
			return true, nil
		}
	}

	// the bucket to which the peer belongs is full. Let's try to find a peer
Aarsh Shah's avatar
Aarsh Shah committed
153
	// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
154 155
	allPeers := bucket.peers()
	for _, pc := range allPeers {
Aarsh Shah's avatar
Aarsh Shah committed
156
		if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
157 158
			// let's evict it and add the new peer
			if bucket.remove(pc.Id) {
Aarsh Shah's avatar
Aarsh Shah committed
159
				bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
160 161 162
				rt.PeerAdded(p)
				return true, nil
			}
163 164
		}
	}
165

166 167 168
	return false, ErrPeerRejectedNoCapacity
}

Aarsh Shah's avatar
Aarsh Shah committed
169 170 171 172 173 174 175 176 177 178 179 180 181 182
// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()

	var pis []PeerInfo
	for _, b := range rt.buckets {
		for _, p := range b.peers() {
			pis = append(pis, p)
		}
	}
	return pis
}

Aarsh Shah's avatar
Aarsh Shah committed
183
// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer
184 185 186 187 188 189 190 191 192
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

	bucketID := rt.bucketIdForPeer(p)
	bucket := rt.buckets[bucketID]

	if pc := bucket.getPeer(p); pc != nil {
Aarsh Shah's avatar
Aarsh Shah committed
193
		pc.LastSuccessfulOutboundQuery = t
194 195 196
		return true
	}
	return false
197 198
}

199 200 201 202
// RemovePeer should be called when the caller is sure that a peer is not useful for queries.
// For eg: the peer could have stopped supporting the DHT protocol.
// It evicts the peer from the Routing Table.
func (rt *RoutingTable) RemovePeer(p peer.ID) {
Steven Allen's avatar
Steven Allen committed
203 204
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
205 206 207 208 209
	rt.removePeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) removePeer(p peer.ID) {
210
	bucketID := rt.bucketIdForPeer(p)
Aarsh Shah's avatar
Aarsh Shah committed
211
	bucket := rt.buckets[bucketID]
212
	if bucket.remove(p) {
213
		// peer removed callback
214
		rt.PeerRemoved(p)
215
		return
216
	}
217 218
}

219
func (rt *RoutingTable) nextBucket() {
220 221 222
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
Aarsh Shah's avatar
Aarsh Shah committed
223 224 225
	bucket := rt.buckets[len(rt.buckets)-1]
	newBucket := bucket.split(len(rt.buckets)-1, rt.local)
	rt.buckets = append(rt.buckets, newBucket)
226

227
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
228
	if newBucket.len() >= rt.bucketsize {
229 230
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
231 232 233
	}
}

Jeromy's avatar
Jeromy committed
234
// Find a specific peer by ID or return nil
235
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
236
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
237 238
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
239 240 241 242
	}
	return srch[0]
}

243
// NearestPeer returns a single peer that is nearest to the given ID
244
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
245 246 247 248
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
249

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
251
	return ""
252 253
}

254
// NearestPeers returns a list of the 'count' closest peers to the given ID
255
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
256 257 258 259
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
260
	cpl := CommonPrefixLen(id, rt.local)
261

262
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
263 264
	rt.tabLock.RLock()

Aarsh Shah's avatar
Aarsh Shah committed
265 266 267
	// Get bucket index or last bucket
	if cpl >= len(rt.buckets) {
		cpl = len(rt.buckets) - 1
268 269
	}

270
	pds := peerDistanceSorter{
271
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
272 273
		target: id,
	}
274

275
	// Add peers from the target bucket (cpl+1 shared bits).
Aarsh Shah's avatar
Aarsh Shah committed
276
	pds.appendPeersFromList(rt.buckets[cpl].list)
277

Steven Allen's avatar
Steven Allen committed
278 279 280
	// If we're short, add peers from all buckets to the right. All buckets
	// to the right share exactly cpl bits (as opposed to the cpl+1 bits
	// shared by the peers in the cpl bucket).
281
	//
Steven Allen's avatar
Steven Allen committed
282 283 284
	// This is, unfortunately, less efficient than we'd like. We will switch
	// to a trie implementation eventually which will allow us to find the
	// closest N peers to any target key.
285

Steven Allen's avatar
Steven Allen committed
286 287 288 289
	if pds.Len() < count {
		for i := cpl + 1; i < len(rt.buckets); i++ {
			pds.appendPeersFromList(rt.buckets[i].list)
		}
290 291
	}

292 293 294 295
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
Aarsh Shah's avatar
fix doc  
Aarsh Shah committed
296 297
	// * bucket cpl-1: cpl-1 shared bits.
	// * bucket cpl-2: cpl-2 shared bits.
298
	// ...
299
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
Aarsh Shah's avatar
Aarsh Shah committed
300
		pds.appendPeersFromList(rt.buckets[i].list)
301
	}
Jeromy's avatar
Jeromy committed
302
	rt.tabLock.RUnlock()
303 304

	// Sort by distance to local peer
305
	pds.sort()
306

307 308
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
309 310
	}

311 312
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
313
		out = append(out, p.p)
314 315 316 317 318
	}

	return out
}

319
// Size returns the total number of peers in the routing table
320 321
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
322
	rt.tabLock.RLock()
Aarsh Shah's avatar
Aarsh Shah committed
323
	for _, buck := range rt.buckets {
324
		tot += buck.len()
325
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
326
	rt.tabLock.RUnlock()
327 328 329
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
330
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
331
func (rt *RoutingTable) ListPeers() []peer.ID {
332
	rt.tabLock.RLock()
333 334 335
	defer rt.tabLock.RUnlock()

	var peers []peer.ID
Aarsh Shah's avatar
Aarsh Shah committed
336
	for _, buck := range rt.buckets {
337
		peers = append(peers, buck.peerIds()...)
338 339 340
	}
	return peers
}
341

Chas Leichner's avatar
Chas Leichner committed
342 343
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
344 345
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
346

Aarsh Shah's avatar
Aarsh Shah committed
347
	for i, b := range rt.buckets {
348 349 350
		fmt.Printf("\tbucket: %d\n", i)

		for e := b.list.Front(); e != nil; e = e.Next() {
Aarsh Shah's avatar
Aarsh Shah committed
351
			p := e.Value.(*PeerInfo).Id
352 353
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
354
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
355
	rt.tabLock.RUnlock()
356
}
357 358 359 360 361 362

// the caller is responsible for the locking
func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
	peerID := ConvertPeerID(p)
	cpl := CommonPrefixLen(peerID, rt.local)
	bucketID := cpl
Aarsh Shah's avatar
Aarsh Shah committed
363 364
	if bucketID >= len(rt.buckets) {
		bucketID = len(rt.buckets) - 1
365 366 367
	}
	return bucketID
}