table.go 6.7 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"errors"
6
	"fmt"
7
	"sync"
8
	"time"
9

George Antoniadis's avatar
George Antoniadis committed
10
	logging "github.com/ipfs/go-log"
Jeromy's avatar
Jeromy committed
11 12
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
13 14
)

Jeromy's avatar
Jeromy committed
15
var log = logging.Logger("table")
16

17 18 19
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

20 21 22 23 24 25 26 27 28
// RoutingTable defines the routing table.
type RoutingTable struct {

	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

29
	// latency metrics
Jeromy's avatar
Jeromy committed
30
	metrics pstore.Metrics
31

32 33 34
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

35
	// kBuckets define all the fingers to other nodes.
Jeromy's avatar
Jeromy committed
36
	Buckets    []*Bucket
37
	bucketsize int
Jeromy's avatar
Jeromy committed
38 39 40 41

	// notification functions
	PeerRemoved func(peer.ID)
	PeerAdded   func(peer.ID)
42 43
}

Chas Leichner's avatar
Chas Leichner committed
44
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Jeromy's avatar
Jeromy committed
45
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m pstore.Metrics) *RoutingTable {
Jeromy's avatar
Jeromy committed
46 47 48 49 50 51 52 53 54 55
	rt := &RoutingTable{
		Buckets:     []*Bucket{newBucket()},
		bucketsize:  bucketsize,
		local:       localID,
		maxLatency:  latency,
		metrics:     m,
		PeerRemoved: func(peer.ID) {},
		PeerAdded:   func(peer.ID) {},
	}

56 57 58 59
	return rt
}

// Update adds or moves the given peer to the front of its respective bucket
60
func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) {
61
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
62
	cpl := CommonPrefixLen(peerID, rt.local)
63

64 65
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
66 67 68
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
69 70
	}

71
	bucket := rt.Buckets[bucketID]
72 73 74 75 76
	if bucket.Has(p) {
		// If the peer is already in the table, move it to the front.
		// This signifies that it it "more active" and the less active nodes
		// Will as a result tend towards the back of the list
		bucket.MoveToFront(p)
77
		return "", nil
78 79 80 81
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
82 83 84 85 86 87 88 89
		return "", ErrPeerRejectedHighLatency
	}

	// We have enough space in the bucket (whether spawned or grouped).
	if bucket.Len() < rt.bucketsize {
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
90 91
	}

92 93 94 95 96 97 98 99 100 101 102 103
	if bucketID == len(rt.Buckets)-1 {
		// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
		rt.nextBucket()
		// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
		bucketID = cpl
		if bucketID >= len(rt.Buckets) {
			bucketID = len(rt.Buckets) - 1
		}
		bucket = rt.Buckets[bucketID]
		if bucket.Len() >= rt.bucketsize {
			// if after all the unfolding, we're unable to find room for this peer, scrap it.
			return "", ErrPeerRejectedNoCapacity
104
		}
105 106 107
		bucket.PushFront(p)
		rt.PeerAdded(p)
		return "", nil
108
	}
109 110

	return "", ErrPeerRejectedNoCapacity
111 112
}

113 114 115 116 117 118
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
	peerID := ConvertPeerID(p)
Matt Joiner's avatar
Matt Joiner committed
119
	cpl := CommonPrefixLen(peerID, rt.local)
120 121 122 123 124 125 126

	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}

	bucket := rt.Buckets[bucketID]
127 128 129
	if bucket.Remove(p) {
		rt.PeerRemoved(p)
	}
130 131
}

132
func (rt *RoutingTable) nextBucket() {
133 134 135
	// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
	// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
	// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
136 137 138 139
	bucket := rt.Buckets[len(rt.Buckets)-1]
	newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
	rt.Buckets = append(rt.Buckets, newBucket)

140 141 142 143
	// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
	if newBucket.Len() >= rt.bucketsize {
		// Keep unfolding the table until the last bucket is not overflowing.
		rt.nextBucket()
144 145 146
	}
}

Jeromy's avatar
Jeromy committed
147
// Find a specific peer by ID or return nil
148
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
149
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
150 151
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
152 153 154 155
	}
	return srch[0]
}

156
// NearestPeer returns a single peer that is nearest to the given ID
157
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
158 159 160 161
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
162

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
164
	return ""
165 166
}

167
// NearestPeers returns a list of the 'count' closest peers to the given ID
168
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
Matt Joiner's avatar
Matt Joiner committed
169
	cpl := CommonPrefixLen(id, rt.local)
170

171
	// It's assumed that this also protects the buckets.
Jeromy's avatar
Jeromy committed
172 173
	rt.tabLock.RLock()

174 175 176 177 178 179 180
	// Get bucket at cpl index or last bucket
	var bucket *Bucket
	if cpl >= len(rt.Buckets) {
		cpl = len(rt.Buckets) - 1
	}
	bucket = rt.Buckets[cpl]

181 182 183 184 185 186
	pds := peerDistanceSorter{
		peers:  make([]peerDistance, 0, 3*rt.bucketsize),
		target: id,
	}
	pds = pds.appendPeersFromList(bucket.list)
	if pds.Len() < count {
187 188
		// In the case of an unusual split, one bucket may be short or empty.
		// if this happens, search both surrounding buckets for nearby peers
189
		if cpl > 0 {
190
			pds = pds.appendPeersFromList(rt.Buckets[cpl-1].list)
191
		}
Jeromy's avatar
Jeromy committed
192
		if cpl < len(rt.Buckets)-1 {
193
			pds = pds.appendPeersFromList(rt.Buckets[cpl+1].list)
194 195
		}
	}
Jeromy's avatar
Jeromy committed
196
	rt.tabLock.RUnlock()
197 198

	// Sort by distance to local peer
199
	pds.sort()
200

201 202
	if count < pds.Len() {
		pds.peers = pds.peers[:count]
203 204
	}

205 206
	out := make([]peer.ID, 0, pds.Len())
	for _, p := range pds.peers {
207
		out = append(out, p.p)
208 209 210 211 212
	}

	return out
}

213
// Size returns the total number of peers in the routing table
214 215
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
217
	for _, buck := range rt.Buckets {
218
		tot += buck.Len()
219
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
	rt.tabLock.RUnlock()
221 222 223
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
224
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
225 226
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
227
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
228
	for _, buck := range rt.Buckets {
229
		peers = append(peers, buck.Peers()...)
230
	}
231
	rt.tabLock.RUnlock()
232 233
	return peers
}
234

Chas Leichner's avatar
Chas Leichner committed
235 236
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
237 238
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
239 240 241 242 243 244 245 246 247 248

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
249
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250
	rt.tabLock.RUnlock()
251
}