Unverified Commit a0cac6f6 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #46 from aarshkshah1992/feat/refresh-cpl

Refresh Cpl's, not buckets
parents fd04af92 028e6c64
......@@ -5,7 +5,6 @@ package kbucket
import (
"container/list"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
......@@ -14,32 +13,14 @@ import (
type Bucket struct {
lk sync.RWMutex
list *list.List
lastRefreshedAtLk sync.RWMutex
lastRefreshedAt time.Time // the last time we looked up a key in the bucket
}
func newBucket() *Bucket {
b := new(Bucket)
b.list = list.New()
b.lastRefreshedAt = time.Now()
return b
}
func (b *Bucket) RefreshedAt() time.Time {
b.lastRefreshedAtLk.RLock()
defer b.lastRefreshedAtLk.RUnlock()
return b.lastRefreshedAt
}
func (b *Bucket) ResetRefreshedAt(newTime time.Time) {
b.lastRefreshedAtLk.Lock()
defer b.lastRefreshedAtLk.Unlock()
b.lastRefreshedAt = newTime
}
func (b *Bucket) Peers() []peer.ID {
b.lk.RLock()
defer b.lk.RUnlock()
......
......@@ -21,6 +21,17 @@ var log = logging.Logger("table")
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")
// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15
// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
type CplRefresh struct {
Cpl uint
LastRefreshAt time.Time
}
// RoutingTable defines the routing table.
type RoutingTable struct {
// ID of the local peer
......@@ -39,6 +50,9 @@ type RoutingTable struct {
Buckets []*Bucket
bucketsize int
cplRefreshLk sync.RWMutex
cplRefreshedAt map[uint]time.Time
// notification functions
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)
......@@ -47,84 +61,71 @@ type RoutingTable struct {
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable {
rt := &RoutingTable{
Buckets: []*Bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
maxLatency: latency,
metrics: m,
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
Buckets: []*Bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
maxLatency: latency,
metrics: m,
cplRefreshedAt: make(map[uint]time.Time),
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
}
return rt
}
// GetAllBuckets is safe to call as rt.Buckets is append-only
// caller SHOULD NOT modify the returned slice
func (rt *RoutingTable) GetAllBuckets() []*Bucket {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
return rt.Buckets
}
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
rt.cplRefreshLk.RLock()
defer rt.cplRefreshLk.RUnlock()
// GenRandPeerID generates a random peerID in bucket=bucketID
func (rt *RoutingTable) GenRandPeerID(bucketID int) peer.ID {
if bucketID < 0 {
panic(fmt.Sprintf("bucketID %d is not non-negative", bucketID))
}
rt.tabLock.RLock()
bucketLen := len(rt.Buckets)
rt.tabLock.RUnlock()
cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))
var targetCpl uint
if bucketID > (bucketLen - 1) {
targetCpl = uint(bucketLen) - 1
} else {
targetCpl = uint(bucketID)
for c, t := range rt.cplRefreshedAt {
cpls = append(cpls, CplRefresh{c, t})
}
// We can only handle upto 16 bit prefixes
if targetCpl > 16 {
targetCpl = 16
return cpls
}
// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
if targetCpl > maxCplForRefresh {
return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
}
var targetPrefix uint16
localPrefix := binary.BigEndian.Uint16(rt.local)
if targetCpl < 16 {
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
randPrefix := uint16(rand.Uint32())
// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask := (^uint16(0)) << (16 - (targetCpl + 1))
targetPrefix = (toggledLocalPrefix & mask) | (randPrefix & ^mask)
} else {
targetPrefix = localPrefix
}
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
randPrefix := uint16(rand.Uint32())
// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask := (^uint16(0)) << (16 - (targetCpl + 1))
targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)
// Convert to a known peer ID.
key := keyPrefixMap[targetPrefix]
id := [34]byte{mh.SHA2_256, 32}
binary.BigEndian.PutUint32(id[2:], key)
return peer.ID(id[:])
return peer.ID(id[:]), nil
}
// Returns the bucket for a given ID
// should NOT modify the peer list on the returned bucket
func (rt *RoutingTable) BucketForID(id ID) *Bucket {
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
cpl := CommonPrefixLen(id, rt.local)
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
if uint(cpl) > maxCplForRefresh {
return
}
return rt.Buckets[bucketID]
rt.cplRefreshLk.Lock()
defer rt.cplRefreshLk.Unlock()
rt.cplRefreshedAt[uint(cpl)] = newTime
}
// Update adds or moves the given peer to the front of its respective bucket
......
......@@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)
// Test basic features of the bucket struct
......@@ -53,48 +54,49 @@ func TestBucket(t *testing.T) {
func TestGenRandPeerID(t *testing.T) {
t.Parallel()
nBuckets := 21
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m)
// create nBuckets
for i := 0; i < nBuckets; i++ {
for {
if p := test.RandPeerIDFatal(t); CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == i {
rt.Update(p)
break
}
}
// generate above maxCplForRefresh fails
p, err := rt.GenRandPeerID(maxCplForRefresh + 1)
require.Error(t, err)
require.Empty(t, p)
// test generate rand peer ID
for cpl := uint(0); cpl <= maxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
require.NoError(t, err)
require.True(t, uint(CommonPrefixLen(ConvertPeerID(peerID), rt.local)) == cpl, "failed for cpl=%d", cpl)
}
}
// test bucket for peer
peers := rt.ListPeers()
for _, p := range peers {
b := rt.BucketForID(ConvertPeerID(p))
if !b.Has(p) {
t.Fatalf("bucket should have peers %s", p.String())
}
func TestRefreshAndGetTrackedCpls(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m)
// add cpl's for tracking
for cpl := uint(0); cpl < maxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
require.NoError(t, err)
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now())
}
// test generate rand peer ID
for bucketID := 0; bucketID < nBuckets; bucketID++ {
peerID := rt.GenRandPeerID(bucketID)
// for bucketID upto maxPrefixLen of 16, CPL should be Exactly bucketID
if bucketID < 16 {
if CommonPrefixLen(ConvertPeerID(peerID), rt.local) != bucketID {
t.Fatalf("cpl should be %d for bucket %d but got %d, generated peerID is %s", bucketID, bucketID,
CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID)
}
} else {
// from bucketID 16 onwards, CPL should be ATLEAST 16
if CommonPrefixLen(ConvertPeerID(peerID), rt.local) < 16 {
t.Fatalf("cpl should be ATLEAST 16 for bucket %d but got %d, generated peerID is %s", bucketID,
CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID)
}
}
// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh))
actualCpls := make(map[uint]struct{})
for i := 0; i < len(trackedCpls); i++ {
actualCpls[trackedCpls[i].Cpl] = struct{}{}
}
for i := uint(0); i < maxCplForRefresh; i++ {
_, ok := actualCpls[i]
require.True(t, ok, "tracked cpl's should have cpl %d", i)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment