table.go 5.1 KB
Newer Older
1 2
// package kbucket implements a kademlia 'k-bucket' routing table.
package kbucket
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4

import (
5
	"fmt"
6
	"sort"
Jeromy's avatar
Jeromy committed
7
	"sync"
8
	"time"
9

10
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
11
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13
)

14 15
var log = u.Logger("table")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17 18
// RoutingTable defines the routing table.
type RoutingTable struct {

19 20 21
	// ID of the local peer
	local ID

Jeromy's avatar
Jeromy committed
22 23 24
	// Blanket lock, refine later for better performance
	tabLock sync.RWMutex

25 26 27
	// latency metrics
	metrics peer.Metrics

28 29 30
	// Maximum acceptable latency for peers in this cluster
	maxLatency time.Duration

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
	// kBuckets define all the fingers to other nodes.
Jeromy's avatar
Jeromy committed
32
	Buckets    []*Bucket
33 34 35
	bucketsize int
}

Chas Leichner's avatar
Chas Leichner committed
36
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
37
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peer.Metrics) *RoutingTable {
38
	rt := new(RoutingTable)
39
	rt.Buckets = []*Bucket{newBucket()}
40
	rt.bucketsize = bucketsize
41
	rt.local = localID
42
	rt.maxLatency = latency
43
	rt.metrics = m
44
	return rt
45 46 47 48
}

// Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned
49
func (rt *RoutingTable) Update(p peer.ID) peer.ID {
Jeromy's avatar
Jeromy committed
50 51
	rt.tabLock.Lock()
	defer rt.tabLock.Unlock()
52
	peerID := ConvertPeerID(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
	cpl := commonPrefixLen(peerID, rt.local)
54

55 56 57
	bucketID := cpl
	if bucketID >= len(rt.Buckets) {
		bucketID = len(rt.Buckets) - 1
58 59
	}

60
	bucket := rt.Buckets[bucketID]
61
	e := bucket.find(p)
62 63
	if e == nil {
		// New peer, add to bucket
64
		if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
65
			// Connection doesnt meet requirements, skip!
66
			return ""
67
		}
68
		bucket.pushFront(p)
69 70

		// Are we past the max bucket size?
71
		if bucket.len() > rt.bucketsize {
72 73
			// If this bucket is the rightmost bucket, and its full
			// we need to split it and create a new bucket
74
			if bucketID == len(rt.Buckets)-1 {
75
				return rt.nextBucket()
76 77
			} else {
				// If the bucket cant split kick out least active node
78
				return bucket.popBack()
79 80
			}
		}
81
		return ""
82
	}
83 84 85 86
	// If the peer is already in the table, move it to the front.
	// This signifies that it it "more active" and the less active nodes
	// Will as a result tend towards the back of the list
	bucket.moveToFront(e)
87
	return ""
88 89
}

90
func (rt *RoutingTable) nextBucket() peer.ID {
91 92 93 94 95 96 97 98 99 100 101
	bucket := rt.Buckets[len(rt.Buckets)-1]
	newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
	rt.Buckets = append(rt.Buckets, newBucket)
	if newBucket.len() > rt.bucketsize {
		return rt.nextBucket()
	}

	// If all elements were on left side of split...
	if bucket.len() > rt.bucketsize {
		return bucket.popBack()
	}
102
	return ""
103 104
}

Jeromy's avatar
Jeromy committed
105
// Find a specific peer by ID or return nil
106
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
Chas Leichner's avatar
Chas Leichner committed
107
	srch := rt.NearestPeers(ConvertPeerID(id), 1)
108 109
	if len(srch) == 0 || srch[0] != id {
		return ""
Jeromy's avatar
Jeromy committed
110 111 112 113
	}
	return srch[0]
}

114
// NearestPeer returns a single peer that is nearest to the given ID
115
func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
116
	peers := rt.NearestPeers(id, 1)
117 118 119
	if len(peers) > 0 {
		return peers[0]
	}
120

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121
	log.Errorf("NearestPeer: Returning nil, table size = %d", rt.Size())
122
	return ""
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123 124
}

125
// NearestPeers returns a list of the 'count' closest peers to the given ID
126
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
Jeromy's avatar
Jeromy committed
127 128
	rt.tabLock.RLock()
	defer rt.tabLock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
	cpl := commonPrefixLen(id, rt.local)
130 131 132 133

	// Get bucket at cpl index or last bucket
	var bucket *Bucket
	if cpl >= len(rt.Buckets) {
134
		cpl = len(rt.Buckets) - 1
135
	}
136
	bucket = rt.Buckets[cpl]
137

138
	var peerArr peerSorterArr
139
	if bucket.len() == 0 {
140 141 142
		// In the case of an unusual split, one bucket may be empty.
		// if this happens, search both surrounding buckets for nearest peer
		if cpl > 0 {
143
			plist := rt.Buckets[cpl-1].list
Jeromy's avatar
Jeromy committed
144
			peerArr = copyPeersFromList(id, peerArr, plist)
145
		}
146

Jeromy's avatar
Jeromy committed
147
		if cpl < len(rt.Buckets)-1 {
148
			plist := rt.Buckets[cpl+1].list
Jeromy's avatar
Jeromy committed
149
			peerArr = copyPeersFromList(id, peerArr, plist)
150
		}
151
	} else {
152
		peerArr = copyPeersFromList(id, peerArr, bucket.list)
153 154
	}

155
	// Sort by distance to local peer
156 157
	sort.Sort(peerArr)

158
	var out []peer.ID
159 160 161 162 163
	for i := 0; i < count && i < peerArr.Len(); i++ {
		out = append(out, peerArr[i].p)
	}

	return out
164
}
Jeromy's avatar
Jeromy committed
165

166
// Size returns the total number of peers in the routing table
Jeromy's avatar
Jeromy committed
167 168
func (rt *RoutingTable) Size() int {
	var tot int
Jeromy's avatar
Jeromy committed
169
	for _, buck := range rt.Buckets {
170
		tot += buck.len()
Jeromy's avatar
Jeromy committed
171 172 173
	}
	return tot
}
174

Chas Leichner's avatar
Chas Leichner committed
175
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
176
// NOTE: This is potentially unsafe... use at your own risk
177 178
func (rt *RoutingTable) ListPeers() []peer.ID {
	var peers []peer.ID
Jeromy's avatar
Jeromy committed
179
	for _, buck := range rt.Buckets {
180
		for e := buck.getIter(); e != nil; e = e.Next() {
181
			peers = append(peers, e.Value.(peer.ID))
182 183 184 185
		}
	}
	return peers
}
186

Chas Leichner's avatar
Chas Leichner committed
187 188
// Print prints a descriptive statement about the provided RoutingTable
func (rt *RoutingTable) Print() {
189 190
	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
	rt.tabLock.RLock()
191 192 193 194 195 196 197 198 199 200

	for i, b := range rt.Buckets {
		fmt.Printf("\tbucket: %d\n", i)

		b.lk.RLock()
		for e := b.list.Front(); e != nil; e = e.Next() {
			p := e.Value.(peer.ID)
			fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
		}
		b.lk.RUnlock()
201
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
	rt.tabLock.RUnlock()
203
}