Commit 49e75721 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by Matt Joiner

refinements in bucket policy.

parent d81c3c35
......@@ -2,6 +2,7 @@
package kbucket
import (
"errors"
"fmt"
"sort"
"sync"
......@@ -14,6 +15,9 @@ import (
var log = logging.Logger("table")
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")
// RoutingTable defines the routing table.
type RoutingTable struct {
......@@ -54,8 +58,7 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m pstore
}
// Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned
func (rt *RoutingTable) Update(p peer.ID) {
func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) {
peerID := ConvertPeerID(p)
cpl := commonPrefixLen(peerID, rt.local)
......@@ -72,29 +75,40 @@ func (rt *RoutingTable) Update(p peer.ID) {
// This signifies that it it "more active" and the less active nodes
// Will as a result tend towards the back of the list
bucket.MoveToFront(p)
return
return "", nil
}
if rt.metrics.LatencyEWMA(p) > rt.maxLatency {
// Connection doesnt meet requirements, skip!
return
}
// New peer, add to bucket
bucket.PushFront(p)
rt.PeerAdded(p)
// Are we past the max bucket size?
if bucket.Len() > rt.bucketsize {
// If this bucket is the rightmost bucket, and its full
// we need to split it and create a new bucket
if bucketID == len(rt.Buckets)-1 {
rt.nextBucket()
} else {
// If the bucket cant split kick out least active node
rt.PeerRemoved(bucket.PopBack())
return "", ErrPeerRejectedHighLatency
}
// We have enough space in the bucket (whether spawned or grouped).
if bucket.Len() < rt.bucketsize {
bucket.PushFront(p)
rt.PeerAdded(p)
return "", nil
}
if bucketID == len(rt.Buckets)-1 {
// if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it.
rt.nextBucket()
// the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket.
bucketID = cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
}
bucket = rt.Buckets[bucketID]
if bucket.Len() >= rt.bucketsize {
// if after all the unfolding, we're unable to find room for this peer, scrap it.
return "", ErrPeerRejectedNoCapacity
}
bucket.PushFront(p)
rt.PeerAdded(p)
return "", nil
}
return "", ErrPeerRejectedNoCapacity
}
// Remove deletes a peer from the routing table. This is to be used
......@@ -116,16 +130,17 @@ func (rt *RoutingTable) Remove(p peer.ID) {
}
func (rt *RoutingTable) nextBucket() {
// This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets.
// _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket.
// This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8.
bucket := rt.Buckets[len(rt.Buckets)-1]
newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
rt.Buckets = append(rt.Buckets, newBucket)
if newBucket.Len() > rt.bucketsize {
rt.nextBucket()
}
// If all elements were on left side of split...
if bucket.Len() > rt.bucketsize {
rt.PeerRemoved(bucket.PopBack())
// The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket.
if newBucket.Len() >= rt.bucketsize {
// Keep unfolding the table until the last bucket is not overflowing.
rt.nextBucket()
}
}
......
......@@ -136,6 +136,35 @@ func TestTableFind(t *testing.T) {
}
}
func TestTableEldestPreferred(t *testing.T) {
local := tu.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m)
// generate size + 1 peers to saturate a bucket
peers := make([]peer.ID, 15)
for i := 0; i < 15; {
if p := tu.RandPeerIDFatal(t); commonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == 0 {
peers[i] = p
i++
}
}
// test 10 first peers are accepted.
for _, p := range peers[:10] {
if _, err := rt.Update(p); err != nil {
t.Errorf("expected all 10 peers to be accepted; instead got: %v", err)
}
}
// test next 5 peers are rejected.
for _, p := range peers[10:] {
if _, err := rt.Update(p); err != ErrPeerRejectedNoCapacity {
t.Errorf("expected extra 5 peers to be rejected; instead got: %v", err)
}
}
}
func TestTableFindMultiple(t *testing.T) {
local := tu.RandPeerIDFatal(t)
m := pstore.NewMetrics()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment