table.go 9.83 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"encoding/binary"
6
	"errors"
7
	"fmt"
8
	"math/rand"
9
	"sync"
10
	"time"
11

12 13
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
14
	mh "github.com/multiformats/go-multihash"
15

George Antoniadis's avatar
George Antoniadis committed
16
	logging "github.com/ipfs/go-log"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("table")
20

21 22 23
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

Aarsh Shah's avatar
Aarsh Shah committed
24 25 26 27 28 29 30 31 32
// MaxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'MaxCplForRefresh' bit prefixes for now.
var MaxCplForRefresh uint = 15

type CplRefresh struct {
	Cpl           uint
	LastRefreshAt time.Time
}

33 34 35 36 37 38 39 40
// RoutingTable defines the routing table.
type RoutingTable struct {
	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

41
	// latency metrics
42
	metrics peerstore.Metrics
43

44 45 46
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

47
	// kBuckets define all the fingers to other nodes.
Jeromy's avatar
Jeromy committed
48
	Buckets    []*Bucket
49
	bucketsize int
Jeromy's avatar
Jeromy committed
50

Aarsh Shah's avatar
Aarsh Shah committed
51 52 53
	cplRefreshLk   sync.RWMutex
	cplRefreshedAt map[uint]time.Time

Jeromy's avatar
Jeromy committed
54 55 56
	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
57 58
}

Chas Leichner's avatar
Chas Leichner committed
59
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
60
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable {
Jeromy's avatar
Jeromy committed
61
	rt := &RoutingTable{
Aarsh Shah's avatar
Aarsh Shah committed
62 63 64 65 66 67 68 69
		Buckets:        []*Bucket{newBucket()},
		bucketsize:     bucketsize,
		local:          localID,
		maxLatency:     latency,
		metrics:        m,
		cplRefreshedAt: make(map[uint]time.Time),
		PeerRemoved:    func(peer.ID) {},
		PeerAdded:      func(peer.ID) {},
Jeromy's avatar
Jeromy committed
70 71
	}

72 73 74
	return rt
}

Aarsh Shah's avatar
Aarsh Shah committed
75 76 77 78 79
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []*CplRefresh {
	rt.cplRefreshLk.RLock()
	defer rt.cplRefreshLk.RUnlock()
Aarsh Shah's avatar
Aarsh Shah committed
80

Aarsh Shah's avatar
Aarsh Shah committed
81 82 83 84
	var cpls []*CplRefresh

	for c, t := range rt.cplRefreshedAt {
		cpls = append(cpls, &CplRefresh{c, t})
85 86
	}

Aarsh Shah's avatar
Aarsh Shah committed
87 88 89 90 91 92 93
	return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
	if targetCpl > MaxCplForRefresh {
		return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", MaxCplForRefresh)
94 95
	}

96
	localPrefix := binary.BigEndian.Uint16(rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
97 98 99 100 101 102 103 104 105 106 107

	// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
	// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
	// to our randomly generated prefix.
	toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
	randPrefix := uint16(rand.Uint32())

	// Combine the toggled local prefix and the random bits at the correct offset
	// such that ONLY the first `targetCpl` bits match the local ID.
	mask := (^uint16(0)) << (16 - (targetCpl + 1))
	targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
108

109 110 111
	// Convert to a known peer ID.
	key := keyPrefixMap[targetPrefix]
	id := [34]byte{mh.SHA2_256, 32}
112
	binary.BigEndian.PutUint32(id[2:], key)
Aarsh Shah's avatar
Aarsh Shah committed
113
	return peer.ID(id[:]), nil
114 115
}

Aarsh Shah's avatar
Aarsh Shah committed
116 117
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
118
	cpl := CommonPrefixLen(id, rt.local)
Aarsh Shah's avatar
Aarsh Shah committed
119 120
	if uint(cpl) > MaxCplForRefresh {
		return
121 122
	}

Aarsh Shah's avatar
Aarsh Shah committed
123 124 125 126
	rt.cplRefreshLk.Lock()
	defer rt.cplRefreshLk.Unlock()

	rt.cplRefreshedAt[uint(cpl)] = newTime
127 128
}

129
// Update adds or moves the given peer to the front of its respective bucket
130
func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) {
131
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
132
	cpl := CommonPrefixLen(peerID, rt.local)
133

134 135
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
136 137 138
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
139 140
	}

141
	bucket := rt.Buckets[bucketID]
142 143 144 145 146
	if bucket.Has(p) {
		// If the peer is already in the table, move it to the front.
		// This signifies that it it "more active" and the less active nodes
		// Will as a result tend towards the back of the list
		bucket.MoveToFront(p)
147
		return "", nil
148 149 150 151
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
152 153 154 155 156 157 158 159
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
	if bucket.Len() < rt.bucketsize {
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
160 161
	}

162 163 164 165 166 167 168 169 170 171 172 173
	if bucketID == len(rt.Buckets)-1 {
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
		bucketID = cpl
		if bucketID >= len(rt.Buckets) {
			bucketID = len(rt.Buckets) - 1
		}
		bucket = rt.Buckets[bucketID]
		if bucket.Len() >= rt.bucketsize {
			// if after all the unfolding, we're unable to find room for this peer, scrap it.
			return "", ErrPeerRejectedNoCapacity
174
		}
175 176 177
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
178
	}
179 180

	return "", ErrPeerRejectedNoCapacity
181 182
}

183 184 185 186
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
187
	cpl := CommonPrefixLen(peerID, rt.local)
188

Steven Allen's avatar
Steven Allen committed
189 190 191
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()

192 193 194 195 196 197
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}

	bucket := rt.Buckets[bucketID]
198 199 200
	if bucket.Remove(p) {
		rt.PeerRemoved(p)
	}
201 202
}

203
func (rt *RoutingTable) nextBucket() {
204 205 206
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
207 208 209 210
	bucket := rt.Buckets[len(rt.Buckets)-1]
	newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
	rt.Buckets = append(rt.Buckets, newBucket)

211 212 213 214
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
	if newBucket.Len() >= rt.bucketsize {
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
215 216 217
	}
}

Jeromy's avatar
Jeromy committed
218
// Find a specific peer by ID or return nil
219
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
220
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
221 222
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
223 224 225 226
	}
	return srch[0]
}

227
// NearestPeer returns a single peer that is nearest to the given ID
228
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
229 230 231 232
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
233

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
235
	return ""
236 237
}

238
// NearestPeers returns a list of the 'count' closest peers to the given ID
239
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
240 241 242 243
	// This is the number of bits _we_ share with the key. All peers in this
	// bucket share cpl bits with us and will therefore share at least cpl+1
	// bits with the given key. +1 because both the target and all peers in
	// this bucket differ from us in the cpl bit.
Matt Joiner's avatar
Matt Joiner committed
244
	cpl := CommonPrefixLen(id, rt.local)
245

246
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
247 248
	rt.tabLock.RLock()

249
	// Get bucket index or last bucket
250 251 252 253
	if cpl >= len(rt.Buckets) {
		cpl = len(rt.Buckets) - 1
	}

254
	pds := peerDistanceSorter{
255
		peers:  make([]peerDistance, 0, count+rt.bucketsize),
256 257
		target: id,
	}
258

259 260 261
	// Add peers from the target bucket (cpl+1 shared bits).
	pds.appendPeersFromList(rt.Buckets[cpl].list)

262 263 264
	// If we're short, add peers from buckets to the right until we have
	// enough. All buckets to the right share exactly cpl bits (as opposed
	// to the cpl+1 bits shared by the peers in the cpl bucket).
265 266 267 268 269
	//
	// Unfortunately, to be completely correct, we can't just take from
	// buckets until we have enough peers because peers because _all_ of
	// these peers will be ~2**(256-cpl) from us.
	//
270 271 272 273
	// However, we're going to do that anyways as it's "good enough"

	for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ {
		pds.appendPeersFromList(rt.Buckets[i].list)
274 275
	}

276 277 278 279 280 281 282
	// If we're still short, add in buckets that share _fewer_ bits. We can
	// do this bucket by bucket because each bucket will share 1 fewer bit
	// than the last.
	//
	// * bucket cpl-1: cpl-2 shared bits.
	// * bucket cpl-2: cpl-3 shared bits.
	// ...
283
	for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
284
		pds.appendPeersFromList(rt.Buckets[i].list)
285
	}
Jeromy's avatar
Jeromy committed
286
	rt.tabLock.RUnlock()
287 288

	// Sort by distance to local peer
289
	pds.sort()
290

291 292
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
293 294
	}

295 296
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
297
		out = append(out, p.p)
298 299 300 301 302
	}

	return out
}

303
// Size returns the total number of peers in the routing table
304 305
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
307
	for _, buck := range rt.Buckets {
308
		tot += buck.Len()
309
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310
	rt.tabLock.RUnlock()
311 312 313
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
314
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
315 316
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
317
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
318
	for _, buck := range rt.Buckets {
319
		peers = append(peers, buck.Peers()...)
320
	}
321
	rt.tabLock.RUnlock()
322 323
	return peers
}
324

Chas Leichner's avatar
Chas Leichner committed
325 326
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
327 328
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
329 330 331 332 333 334 335 336 337 338

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
339
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340
	rt.tabLock.RUnlock()
341
}