table.go 9.61 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"encoding/binary"
6
	"errors"
7
	"fmt"
8
	"math/rand"
9
	"sync"
10
	"time"
11

12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
14
	mh "github.com/multiformats/go-multihash"
15

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("table")
20

21 22 23
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

24 25 26 27 28 29 30 31
// RoutingTable defines the routing table.
type RoutingTable struct {
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

32
	// latency metrics
33
	metrics peerstore.Metrics
34

35 36 37
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

38
	// kBuckets define all the fingers to other nodes.
Jeromy's avatar
Jeromy committed
39
	Buckets    []*Bucket
40
	bucketsize int
Jeromy's avatar
Jeromy committed
41 42 43 44

	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
45 46
}

Chas Leichner's avatar
Chas Leichner committed
47
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
48
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable {
Jeromy's avatar
Jeromy committed
49 50 51 52 53 54 55 56 57 58
	rt := &RoutingTable{
		Buckets:     []*Bucket{newBucket()},
		bucketsize:  bucketsize,
		local:       localID,
		maxLatency:  latency,
		metrics:     m,
		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},
	}

59 60 61
	return rt
}

62 63
// GetAllBuckets is safe to call as rt.Buckets is append-only
// caller SHOULD NOT modify the returned slice
Aarsh Shah's avatar
Aarsh Shah committed
64 65 66 67 68 69
func (rt *RoutingTable) GetAllBuckets() []*Bucket {
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()
	return rt.Buckets
}

70
// GenRandPeerID generates a random peerID in bucket=bucketID
Aarsh Shah's avatar
Aarsh Shah committed
71
func (rt *RoutingTable) GenRandPeerID(bucketID int) peer.ID {
72
	if bucketID < 0 {
Aarsh Shah's avatar
Aarsh Shah committed
73
		panic(fmt.Sprintf("bucketID %d is not non-negative", bucketID))
74
	}
75 76 77 78
	rt.tabLock.RLock()
	bucketLen := len(rt.Buckets)
	rt.tabLock.RUnlock()

79
	var targetCpl uint
80
	if bucketID > (bucketLen - 1) {
81
		targetCpl = uint(bucketLen) - 1
82
	} else {
83
		targetCpl = uint(bucketID)
84 85
	}

86
	// We can only handle upto 16 bit prefixes
87 88 89 90
	if targetCpl > 16 {
		targetCpl = 16
	}

91
	var targetPrefix uint16
92
	localPrefix := binary.BigEndian.Uint16(rt.local)
93 94 95 96 97 98 99 100 101 102 103 104 105 106
	if targetCpl < 16 {
		// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
		// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
		// to our randomly generated prefix.
		toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
		randPrefix := uint16(rand.Uint32())

		// Combine the toggled local prefix and the random bits at the correct offset
		// such that ONLY the first `targetCpl` bits match the local ID.
		mask := (^uint16(0)) << (16 - (targetCpl + 1))
		targetPrefix = (toggledLocalPrefix & mask) | (randPrefix & ^mask)
	} else {
		targetPrefix = localPrefix
	}
107

108 109 110
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
111
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
112
	return peer.ID(id[:])
113 114
}

115
// Returns the bucket for a given ID
116
// should NOT modify the peer list on the returned bucket
117 118
func (rt *RoutingTable) BucketForID(id ID) *Bucket {
	cpl := CommonPrefixLen(id, rt.local)
119 120 121 122 123 124 125 126 127 128 129

	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}

	return rt.Buckets[bucketID]
}

130
// Update adds or moves the given peer to the front of its respective bucket
131
func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) {
132
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
133
	cpl := CommonPrefixLen(peerID, rt.local)
134

135 136
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
137 138 139
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
140 141
	}

142
	bucket := rt.Buckets[bucketID]
143 144 145 146 147
	if bucket.Has(p) {
		// If the peer is already in the table, move it to the front.
		// This signifies that it it "more active" and the less active nodes
		// Will as a result tend towards the back of the list
		bucket.MoveToFront(p)
148
		return "", nil
149 150 151 152
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
153 154 155 156 157 158 159 160
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
	if bucket.Len() < rt.bucketsize {
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
161 162
	}

163 164 165 166 167 168 169 170 171 172 173 174
	if bucketID == len(rt.Buckets)-1 {
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
		bucketID = cpl
		if bucketID >= len(rt.Buckets) {
			bucketID = len(rt.Buckets) - 1
		}
		bucket = rt.Buckets[bucketID]
		if bucket.Len() >= rt.bucketsize {
			// if after all the unfolding, we're unable to find room for this peer, scrap it.
			return "", ErrPeerRejectedNoCapacity
175
		}
176 177 178
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
179
	}
180 181

	return "", ErrPeerRejectedNoCapacity
182 183
}

184 185 186 187
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
188
	cpl := CommonPrefixLen(peerID, rt.local)
189

Steven Allen's avatar
Steven Allen committed
190 191 192
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

193 194 195 196 197 198
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}

	bucket := rt.Buckets[bucketID]
199 200 201
	if bucket.Remove(p) {
		rt.PeerRemoved(p)
	}
202 203
}

204
func (rt *RoutingTable) nextBucket() {
205 206 207
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
208 209 210 211
	bucket := rt.Buckets[len(rt.Buckets)-1]
	newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
	rt.Buckets = append(rt.Buckets, newBucket)

212 213 214 215
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
	if newBucket.Len() >= rt.bucketsize {
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
216 217 218
	}
}

Jeromy's avatar
Jeromy committed
219
// Find a specific peer by ID or return nil
220
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
221
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
222 223
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
224 225 226 227
	}
	return srch[0]
}

228
// NearestPeer returns a single peer that is nearest to the given ID
229
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
230 231 232 233
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
234

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
236
	return ""
237 238
}

239
// NearestPeers returns a list of the 'count' closest peers to the given ID
240
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
241 242 243 244
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
245
	cpl := CommonPrefixLen(id, rt.local)
246

247
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
248 249
	rt.tabLock.RLock()

250
	// Get bucket index or last bucket
251 252 253 254
	if cpl >= len(rt.Buckets) {
		cpl = len(rt.Buckets) - 1
	}

255
	pds := peerDistanceSorter{
256
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
257 258
		target: id,
	}
259

260 261 262
	// Add peers from the target bucket (cpl+1 shared bits).
	pds.appendPeersFromList(rt.Buckets[cpl].list)

263 264 265
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
266 267 268 269 270
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
271 272 273 274
	// However, we're going to do that anyways as it's "good enough"

	for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.Buckets[i].list)
275 276
	}

277 278 279 280 281 282 283
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
	// * bucket cpl-1: cpl-2 shared bits.
	// * bucket cpl-2: cpl-3 shared bits.
	// ...
284
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
285
		pds.appendPeersFromList(rt.Buckets[i].list)
286
	}
Jeromy's avatar
Jeromy committed
287
	rt.tabLock.RUnlock()
288 289

	// Sort by distance to local peer
290
	pds.sort()
291

292 293
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
294 295
	}

296 297
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
298
		out = append(out, p.p)
299 300 301 302 303
	}

	return out
}

304
// Size returns the total number of peers in the routing table
305 306
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
307
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
308
	for _, buck := range rt.Buckets {
309
		tot += buck.Len()
310
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
	rt.tabLock.RUnlock()
312 313 314
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
315
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
316 317
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
318
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
319
	for _, buck := range rt.Buckets {
320
		peers = append(peers, buck.Peers()...)
321
	}
322
	rt.tabLock.RUnlock()
323 324
	return peers
}
325

Chas Leichner's avatar
Chas Leichner committed
326 327
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
328 329
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
330 331 332 333 334 335 336 337 338 339

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
340
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341
	rt.tabLock.RUnlock()
342
}