table.go 5.55 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
3 4

import (
5
	"fmt"
6 7
	"sort"
	"sync"
8
	"time"
9

George Antoniadis's avatar
George Antoniadis committed
10
	logging "github.com/ipfs/go-log"
Jeromy's avatar
Jeromy committed
11 12
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
13 14
)

Jeromy's avatar
Jeromy committed
15
var log = logging.Logger("table")
16

17 18 19 20 21 22 23 24 25
// RoutingTable defines the routing table.
type RoutingTable struct {

	// ID of the local peer
	local ID

	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

26
	// latency metrics
Jeromy's avatar
Jeromy committed
27
	metrics pstore.Metrics
28

29 30 31
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

32
	// kBuckets define all the fingers to other nodes.
Jeromy's avatar
Jeromy committed
33
	Buckets    []*Bucket
34 35 36
	bucketsize int
}

Chas Leichner's avatar
Chas Leichner committed
37
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Jeromy's avatar
Jeromy committed
38
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m pstore.Metrics) *RoutingTable {
39
	rt := new(RoutingTable)
40
	rt.Buckets = []*Bucket{newBucket()}
41
	rt.bucketsize = bucketsize
42
	rt.local = localID
43
	rt.maxLatency = latency
44
	rt.metrics = m
45 46 47 48 49
	return rt
}

// Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned
50
func (rt *RoutingTable) Update(p peer.ID) {
51
	peerID := ConvertPeerID(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52
	cpl := commonPrefixLen(peerID, rt.local)
53

54 55
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
56 57 58
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
59 60
	}

61
	bucket := rt.Buckets[bucketID]
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
	if bucket.Has(p) {
		// If the peer is already in the table, move it to the front.
		// This signifies that it it "more active" and the less active nodes
		// Will as a result tend towards the back of the list
		bucket.MoveToFront(p)
		return
	}

	if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
		// Connection doesnt meet requirements, skip!
		return
	}

	// New peer, add to bucket
	bucket.PushFront(p)

	// Are we past the max bucket size?
	if bucket.Len() > rt.bucketsize {
		// If this bucket is the rightmost bucket, and its full
		// we need to split it and create a new bucket
		if bucketID == len(rt.Buckets)-1 {
			rt.nextBucket()
			return
		} else {
			// If the bucket cant split kick out least active node
			bucket.PopBack()
			return
89 90 91 92
		}
	}
}

93 94 95 96 97 98 99 100 101 102 103 104 105 106
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
	peerID := ConvertPeerID(p)
	cpl := commonPrefixLen(peerID, rt.local)

	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
	}

	bucket := rt.Buckets[bucketID]
107
	bucket.Remove(p)
108 109
}

110
func (rt *RoutingTable) nextBucket() peer.ID {
111 112 113
	bucket := rt.Buckets[len(rt.Buckets)-1]
	newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
	rt.Buckets = append(rt.Buckets, newBucket)
114
	if newBucket.Len() > rt.bucketsize {
115 116 117 118
		return rt.nextBucket()
	}

	// If all elements were on left side of split...
119 120
	if bucket.Len() > rt.bucketsize {
		return bucket.PopBack()
121
	}
122
	return ""
123 124
}

Jeromy's avatar
Jeromy committed
125
// Find a specific peer by ID or return nil
126
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
127
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
128 129
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
130 131 132 133
	}
	return srch[0]
}

134
// NearestPeer returns a single peer that is nearest to the given ID
135
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
136 137 138 139
	peers := rt.NearestPeers(id, 1)
	if len(peers) > 0 {
		return peers[0]
	}
140

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
	log.Debugf("NearestPeer: Returning nil, table size = %d", rt.Size())
142
	return ""
143 144
}

145
// NearestPeers returns a list of the 'count' closest peers to the given ID
146
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
	cpl := commonPrefixLen(id, rt.local)
148

Jeromy's avatar
Jeromy committed
149 150
	rt.tabLock.RLock()

151 152 153 154 155 156 157 158
	// Get bucket at cpl index or last bucket
	var bucket *Bucket
	if cpl >= len(rt.Buckets) {
		cpl = len(rt.Buckets) - 1
	}
	bucket = rt.Buckets[cpl]

	var peerArr peerSorterArr
159 160 161 162
	peerArr = copyPeersFromList(id, peerArr, bucket.list)
	if len(peerArr) < count {
		// In the case of an unusual split, one bucket may be short or empty.
		// if this happens, search both surrounding buckets for nearby peers
163
		if cpl > 0 {
164
			plist := rt.Buckets[cpl-1].list
165 166 167
			peerArr = copyPeersFromList(id, peerArr, plist)
		}

Jeromy's avatar
Jeromy committed
168
		if cpl < len(rt.Buckets)-1 {
169
			plist := rt.Buckets[cpl+1].list
170 171 172
			peerArr = copyPeersFromList(id, peerArr, plist)
		}
	}
Jeromy's avatar
Jeromy committed
173
	rt.tabLock.RUnlock()
174 175 176 177

	// Sort by distance to local peer
	sort.Sort(peerArr)

178
	var out []peer.ID
179 180 181 182 183 184 185
	for i := 0; i < count && i < peerArr.Len(); i++ {
		out = append(out, peerArr[i].p)
	}

	return out
}

186
// Size returns the total number of peers in the routing table
187 188
func (rt *RoutingTable) Size() int {
	var tot int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
190
	for _, buck := range rt.Buckets {
191
		tot += buck.Len()
192
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193
	rt.tabLock.RUnlock()
194 195 196
	return tot
}

Chas Leichner's avatar
Chas Leichner committed
197
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
198
// NOTE: This is potentially unsafe... use at your own risk
199 200
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
201
	rt.tabLock.RLock()
Jeromy's avatar
Jeromy committed
202
	for _, buck := range rt.Buckets {
203
		peers = append(peers, buck.Peers()...)
204
	}
205
	rt.tabLock.RUnlock()
206 207
	return peers
}
208

Chas Leichner's avatar
Chas Leichner committed
209 210
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
211 212
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
213 214 215 216 217 218 219 220 221 222

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
223
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
	rt.tabLock.RUnlock()
225
}