table.go 10 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"encoding/binary"
6
	"errors"
7
	"fmt"
8
	"math/rand"
9
	"sync"
10
	"time"
11

12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
14
	mh "github.com/multiformats/go-multihash"
15

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("table")
20

21 22 23
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

Aarsh Shah's avatar
Aarsh Shah committed
24 25 26
// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15
Aarsh Shah's avatar
Aarsh Shah committed
27

Aarsh Shah's avatar
Aarsh Shah committed
28 29
// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
Aarsh Shah's avatar
Aarsh Shah committed
30 31 32 33 34
type CplRefresh struct {
	Cpl           uint
	LastRefreshAt time.Time
}

35 36 37 38 39 40 41 42
// RoutingTable defines the routing table.
type RoutingTable struct {
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

43
	// latency metrics
44
	metrics peerstore.Metrics
45

46 47 48
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

49
	// kBuckets define all the fingers to other nodes.
Jeromy's avatar
Jeromy committed
50
	Buckets    []*Bucket
51
	bucketsize int
Jeromy's avatar
Jeromy committed
52

Aarsh Shah's avatar
Aarsh Shah committed
53 54 55
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

Jeromy's avatar
Jeromy committed
56 57 58
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
59 60
}

Chas Leichner's avatar
Chas Leichner committed
61
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
62
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable {
Jeromy's avatar
Jeromy committed
63
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
64 65 66 67 68 69 70 71
		Buckets:        []*Bucket{newBucket()},
		bucketsize:     bucketsize,
		local:          localID,
		maxLatency:     latency,
		metrics:        m,
		cplRefreshedAt: make(map[uint]time.Time),
		PeerRemoved:    func(peer.ID) {},
		PeerAdded:      func(peer.ID) {},
Jeromy's avatar
Jeromy committed
72 73
	}

74 75 76
	return rt
}

Aarsh Shah's avatar
Aarsh Shah committed
77 78
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
Aarsh Shah's avatar
Aarsh Shah committed
79
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
80 81
	rt.cplRefreshLk.RLock()
	defer rt.cplRefreshLk.RUnlock()
Aarsh Shah's avatar
Aarsh Shah committed
82

Aarsh Shah's avatar
Aarsh Shah committed
83
	cpls := make([]CplRefresh, len(rt.cplRefreshedAt))
Aarsh Shah's avatar
Aarsh Shah committed
84

Aarsh Shah's avatar
Aarsh Shah committed
85
	i := 0
Aarsh Shah's avatar
Aarsh Shah committed
86
	for c, t := range rt.cplRefreshedAt {
Aarsh Shah's avatar
Aarsh Shah committed
87 88
		cpls[i] = CplRefresh{c, t}
		i++
89 90
	}

Aarsh Shah's avatar
Aarsh Shah committed
91 92 93 94 95
	return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
Aarsh Shah's avatar
Aarsh Shah committed
96 97
	if targetCpl > maxCplForRefresh {
		return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
98 99
	}

100
	localPrefix := binary.BigEndian.Uint16(rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
101 102 103 104 105 106 107 108 109 110 111

	// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
	// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
	// to our randomly generated prefix.
	toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
	randPrefix := uint16(rand.Uint32())

	// Combine the toggled local prefix and the random bits at the correct offset
	// such that ONLY the first `targetCpl` bits match the local ID.
	mask := (^uint16(0)) << (16 - (targetCpl + 1))
	targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
112

113 114 115
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
116
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
117
	return peer.ID(id[:]), nil
118 119
}

Aarsh Shah's avatar
Aarsh Shah committed
120 121
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
122
	cpl := CommonPrefixLen(id, rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
123
	if uint(cpl) > maxCplForRefresh {
Aarsh Shah's avatar
Aarsh Shah committed
124
		return
125 126
	}

Aarsh Shah's avatar
Aarsh Shah committed
127 128 129 130
	rt.cplRefreshLk.Lock()
	defer rt.cplRefreshLk.Unlock()

	rt.cplRefreshedAt[uint(cpl)] = newTime
131 132
}

133
// Update adds or moves the given peer to the front of its respective bucket
134
func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) {
135
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
136
	cpl := CommonPrefixLen(peerID, rt.local)
137

138 139
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
140 141 142
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
143 144
	}

145
	bucket := rt.Buckets[bucketID]
146 147 148 149 150
	if bucket.Has(p) {
		// If the peer is already in the table, move it to the front.
		// This signifies that it it "more active" and the less active nodes
		// Will as a result tend towards the back of the list
		bucket.MoveToFront(p)
151
		return "", nil
152 153 154 155
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
156 157 158 159 160 161 162 163
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
	if bucket.Len() < rt.bucketsize {
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
164 165
	}

166 167 168 169 170 171 172 173 174 175 176 177
	if bucketID == len(rt.Buckets)-1 {
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
		bucketID = cpl
		if bucketID >= len(rt.Buckets) {
			bucketID = len(rt.Buckets) - 1
		}
		bucket = rt.Buckets[bucketID]
		if bucket.Len() >= rt.bucketsize {
			// if after all the unfolding, we're unable to find room for this peer, scrap it.
			return "", ErrPeerRejectedNoCapacity
178
		}
179 180 181
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
182
	}
183 184

	return "", ErrPeerRejectedNoCapacity
185 186
}

187 188 189 190
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
191
	cpl := CommonPrefixLen(peerID, rt.local)
192

Steven Allen's avatar
Steven Allen committed
193 194 195
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

196 197 198 199 200 201
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}

	bucket := rt.Buckets[bucketID]
202 203 204
	if bucket.Remove(p) {
		rt.PeerRemoved(p)
	}
205 206
}

207
func (rt *RoutingTable) nextBucket() {
208 209 210
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
211 212 213 214
	bucket := rt.Buckets[len(rt.Buckets)-1]
	newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
	rt.Buckets = append(rt.Buckets, newBucket)

215 216 217 218
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
	if newBucket.Len() >= rt.bucketsize {
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
219 220 221
	}
}

Jeromy's avatar
Jeromy committed
222
// Find a specific peer by ID or return nil
223
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
224
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
225 226
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
227 228 229 230
	}
	return srch[0]
}

231
// NearestPeer returns a single peer that is nearest to the given ID
232
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
233 234 235 236
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
237

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
239
	return ""
240 241
}

242
// NearestPeers returns a list of the 'count' closest peers to the given ID
243
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
244 245 246 247
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
248
	cpl := CommonPrefixLen(id, rt.local)
249

250
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
251 252
	rt.tabLock.RLock()

253
	// Get bucket index or last bucket
254 255 256 257
	if cpl >= len(rt.Buckets) {
		cpl = len(rt.Buckets) - 1
	}

258
	pds := peerDistanceSorter{
259
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
260 261
		target: id,
	}
262

263 264 265
	// Add peers from the target bucket (cpl+1 shared bits).
	pds.appendPeersFromList(rt.Buckets[cpl].list)

266 267 268
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
269 270 271 272 273
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
274 275 276 277
	// However, we're going to do that anyways as it's "good enough"

	for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.Buckets[i].list)
278 279
	}

280 281 282 283 284 285 286
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
	// * bucket cpl-1: cpl-2 shared bits.
	// * bucket cpl-2: cpl-3 shared bits.
	// ...
287
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
288
		pds.appendPeersFromList(rt.Buckets[i].list)
289
	}
Jeromy's avatar
Jeromy committed
290
	rt.tabLock.RUnlock()
291 292

	// Sort by distance to local peer
293
	pds.sort()
294

295 296
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
297 298
	}

299 300
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
301
		out = append(out, p.p)
302 303 304 305 306
	}

	return out
}

307
// Size returns the total number of peers in the routing table
308 309
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
311
	for _, buck := range rt.Buckets {
312
		tot += buck.Len()
313
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314
	rt.tabLock.RUnlock()
315 316 317
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
318
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
319 320
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
321
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
322
	for _, buck := range rt.Buckets {
323
		peers = append(peers, buck.Peers()...)
324
	}
325
	rt.tabLock.RUnlock()
326 327
	return peers
}
328

Chas Leichner's avatar
Chas Leichner committed
329 330
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
331 332
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
333 334 335 336 337 338 339 340 341 342

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
343
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
344
	rt.tabLock.RUnlock()
345
}