Unverified Commit d1a1e924 authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Merge pull request #59 from libp2p/feat/better-replacement

Replace dead peers & increase replacement cache size
parents 6f18d798 2bffeff6
......@@ -19,8 +19,10 @@ type peerDistanceSorter struct {
target ID
}
func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) { pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a] }
func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) {
pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a]
}
func (pds *peerDistanceSorter) Less(a, b int) bool {
return pds.peers[a].distance.less(pds.peers[b].distance)
}
......
......@@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
mh "github.com/multiformats/go-multihash"
)
......@@ -44,7 +42,11 @@ type CplRefresh struct {
// RoutingTable defines the routing table.
type RoutingTable struct {
// the routing table context
ctx context.Context
// function to cancel the RT context
ctxCancel context.CancelFunc
// ID of the local peer
local ID
......@@ -71,17 +73,20 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)
// is peer replacement enabled ?
isReplaceEnabled bool
// peerReplaceCh is the channel to write a peer replacement request to
peerReplaceCh chan peer.ID
// function to determine the validity of a peer for RT membership
peerValidationFnc PeerValidationFunc
// timeout for a single call to the peer validation function
peerValidationTimeout time.Duration
// interval between two runs of the table cleanup routine
tableCleanupInterval time.Duration
// function to select peers that need to be validated
// function to select peers that need to be validated during cleanup
peersForValidationFnc PeerSelectionFunc
proc goprocess.Process
}
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
......@@ -95,7 +100,6 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
}
rt := &RoutingTable{
ctx: context.Background(),
buckets: []*bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
......@@ -108,18 +112,24 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
peerReplaceCh: make(chan peer.ID, bucketsize*2),
peerValidationFnc: cfg.tableCleanup.peerValidationFnc,
peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
tableCleanupInterval: cfg.tableCleanup.interval,
}
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
rt.proc = goprocessctx.WithContext(rt.ctx)
// create the replacement cache
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize*2)
rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
// schedule periodic RT cleanup if peer validation function has been passed
if rt.peerValidationFnc != nil {
rt.proc.Go(rt.cleanup)
rt.isReplaceEnabled = (rt.peerValidationFnc != nil)
if rt.isReplaceEnabled {
go rt.cleanup()
go rt.startPeerReplacement()
}
return rt, nil
......@@ -128,60 +138,8 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
return rt.proc.Close()
}
func (rt *RoutingTable) cleanup(proc goprocess.Process) {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// continue if we are able to successfully validate the peer
// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
if validatePeerF(pinfo.Id) {
log.Infof("successfully validated missing peer=%s", pinfo.Id)
continue
}
// peer does not seem to be alive, let's try candidates now
log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
// evict missing peer
rt.HandlePeerDead(pinfo.Id)
// keep trying replacement candidates for the missing peer till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
break
}
log.Infof("failed to validated candidate=%s", c)
// remove candidate
rt.HandlePeerDead(c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
}
}
case <-proc.Closing():
return
}
}
rt.ctxCancel()
return nil
}
// returns the peers that need to be validated.
......@@ -299,6 +257,11 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
return rt.addPeer(p)
}
// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID) (evicted peer.ID, err error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if peer := bucket.getPeer(p); peer != nil {
......@@ -341,17 +304,29 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
}
// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
// remove it as a candidate
rt.cplReplacementCache.remove(p)
// remove it from the RT
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
rt.removePeer(p)
}
// locking is the responsibility of the caller
func (rt *RoutingTable) removePeer(p peer.ID) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if bucket.remove(p) {
// request a replacement
if rt.isReplaceEnabled {
select {
case rt.peerReplaceCh <- p:
default:
log.Errorf("unable to request replacement for peer=%s as queue for replace requests is full", p)
}
}
// peer removed callback
rt.PeerRemoved(p)
}
}
......@@ -469,12 +444,13 @@ func (rt *RoutingTable) Size() int {
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
func (rt *RoutingTable) ListPeers() []peer.ID {
var peers []peer.ID
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
var peers []peer.ID
for _, buck := range rt.buckets {
peers = append(peers, buck.peerIds()...)
}
rt.tabLock.RUnlock()
return peers
}
......
package kbucket
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// TODO This is racy
// A peer could disconnect immediately after we validate it & would thus be in missing state again
// which means we would wrongly mark it as active here. The longer term solution is to
// handle all peer related events in a single event loop in the RT or more fingrained locking at bucket/peer level.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
if validatePeerF(pinfo.Id) {
rt.tabLock.Lock()
// add it back/mark it as active ONLY if it is still in the RT
// to avoid adding it back if it's been marked as dead
i := rt.bucketIdForPeer(pinfo.Id)
if peer := rt.buckets[i].getPeer(pinfo.Id); peer != nil {
log.Debugf("successfully validated missing peer=%s, marking it as active", pinfo.Id)
peer.State = PeerStateActive
}
rt.tabLock.Unlock()
continue
}
// peer does not seem to be alive, let's try to replace it
// evict missing peer & request replacement ONLY if it's NOT marked as active to avoid removing a peer that connected after
// the failed validation
rt.tabLock.Lock()
i := rt.bucketIdForPeer(pinfo.Id)
p := rt.buckets[i].getPeer(pinfo.Id)
if p != nil && p.State != PeerStateActive {
log.Debugf("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
rt.removePeer(pinfo.Id)
}
rt.tabLock.Unlock()
}
case <-rt.ctx.Done():
return
}
}
}
// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Debugf("successfully validated candidate=%s for peer=%s", c, p)
// TODO There is a race here. The peer could disconnect from us or stop supporting the DHT
// protocol after the validation which means we should not be adding it to the RT here.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
rt.tabLock.Lock()
rt.addPeer(c)
rt.tabLock.Unlock()
break
}
log.Debugf("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
if !notEmpty {
log.Debugf("failed to replace missing peer=%s as all candidates were invalid", p)
}
case <-rt.ctx.Done():
return
}
}
}
package kbucket
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)
func TestTableCleanup(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
// Generate:
// 6 peers with CPL 0
// 6 peers with CPL 1
cplPeerMap := make(map[int][]peer.ID)
for cpl := 0; cpl < 2; cpl++ {
i := 0
for {
p := test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl {
cplPeerMap[cpl] = append(cplPeerMap[cpl], p)
i++
if i == 6 {
break
}
}
}
}
// mock peer validation fnc that successfully validates p[1], p[3] & p[5]
f := func(ctx context.Context, p peer.ID) bool {
cpl := CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p))
if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p {
return true
} else {
return false
}
}
// create RT with a very short cleanup interval
rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f),
TableCleanupInterval(100*time.Millisecond))
require.NoError(t, err)
// for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates
for _, peers := range cplPeerMap {
for _, p := range peers {
rt.HandlePeerAlive(p)
}
}
// validate current TABLE state
require.Len(t, rt.ListPeers(), 6)
rt.tabLock.RLock()
ps0 := rt.buckets[0].peerIds()
require.Len(t, ps0, 3)
ps1 := rt.buckets[1].peerIds()
require.Len(t, ps1, 3)
require.Contains(t, ps0, cplPeerMap[0][0])
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps0, cplPeerMap[0][2])
require.Contains(t, ps1, cplPeerMap[1][0])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][2])
rt.tabLock.RUnlock()
// validate current state of replacement cache
rt.cplReplacementCache.Lock()
require.Len(t, rt.cplReplacementCache.candidates, 2)
require.Len(t, rt.cplReplacementCache.candidates[0], 3)
require.Len(t, rt.cplReplacementCache.candidates[1], 3)
rt.cplReplacementCache.Unlock()
// now disconnect peers 0 1 & 2 from both buckets so it has only peer 1 left after it gets validated
for _, peers := range cplPeerMap {
rt.HandlePeerDisconnect(peers[0])
rt.HandlePeerDisconnect(peers[1])
rt.HandlePeerDisconnect(peers[2])
}
// let RT cleanup complete
time.Sleep(2 * time.Second)
// verify RT state
require.Len(t, rt.ListPeers(), 6)
rt.tabLock.RLock()
ps0 = rt.buckets[0].peerIds()
require.Len(t, ps0, 3)
ps1 = rt.buckets[1].peerIds()
require.Len(t, ps1, 3)
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps0, cplPeerMap[0][3])
require.Contains(t, ps0, cplPeerMap[0][5])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][3])
require.Contains(t, ps1, cplPeerMap[1][5])
rt.tabLock.RUnlock()
// verify candidates were removed from the replacement cache
rt.cplReplacementCache.Lock()
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
// close RT
require.NoError(t, rt.Close())
}
......@@ -3,7 +3,6 @@ package kbucket
import (
"context"
"math/rand"
"sync"
"testing"
"time"
......@@ -167,37 +166,53 @@ func TestHandlePeerDead(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
var candidate peer.ID
for {
candidate = test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(candidate), ConvertPeerID(local)) == 0 {
break
}
}
f := func(ctx context.Context, p peer.ID) bool {
if p == candidate {
return true
}
return false
}
m := pstore.NewMetrics()
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc))
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(f))
require.NoError(t, err)
// push 3 peers -> 2 for the first bucket, and 1 as candidates
var peers []peer.ID
for i := 0; i < 3; i++ {
p, err := rt.GenRandPeerID(uint(0))
require.NoError(t, err)
require.NotEmpty(t, p)
rt.HandlePeerAlive(p)
peers = append(peers, p)
}
p1, _ := rt.GenRandPeerID(0)
p2, _ := rt.GenRandPeerID(0)
rt.HandlePeerAlive(p1)
rt.HandlePeerAlive(p2)
rt.HandlePeerAlive(candidate)
// ensure p1 & p2 are in the RT
require.Len(t, rt.ListPeers(), 2)
require.Contains(t, rt.ListPeers(), p1)
require.Contains(t, rt.ListPeers(), p2)
// ensure we have 1 candidate
rt.cplReplacementCache.Lock()
require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)])
require.True(t, len(rt.cplReplacementCache.candidates[uint(0)]) == 1)
require.Len(t, rt.cplReplacementCache.candidates[uint(0)], 1)
require.Contains(t, rt.cplReplacementCache.candidates[uint(0)], candidate)
rt.cplReplacementCache.Unlock()
// mark a peer as dead and ensure it's not in the RT
require.NotEmpty(t, rt.Find(peers[0]))
rt.HandlePeerDead(peers[0])
require.Empty(t, rt.Find(peers[0]))
// mark the peer as dead & verify we don't get it as a candidate
rt.HandlePeerDead(peers[2])
// mark a peer as dead and ensure it's not in the RT & it gets replaced
require.NotEmpty(t, rt.Find(p1))
require.Empty(t, rt.Find(candidate))
rt.HandlePeerDead(p1)
require.Empty(t, rt.Find(p1))
time.Sleep(2 * time.Second)
require.NotEmpty(t, rt.Find(p2))
rt.cplReplacementCache.Lock()
require.Nil(t, rt.cplReplacementCache.candidates[uint(0)])
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
require.NotEmpty(t, rt.Find(candidate))
}
func TestTableCallbacks(t *testing.T) {
......@@ -578,111 +593,6 @@ func TestTableMultithreaded(t *testing.T) {
<-done
}
func TestTableCleanup(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
// Generate:
// 6 peers with CPL 0
// 6 peers with CPL 1
cplPeerMap := make(map[int][]peer.ID)
for cpl := 0; cpl < 2; cpl++ {
i := 0
for {
p := test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl {
cplPeerMap[cpl] = append(cplPeerMap[cpl], p)
i++
if i == 6 {
break
}
}
}
}
// mock peer validation fnc that successfully validates p[1], p[3] & p[5]
var addedCandidatesLk sync.Mutex
addedCandidates := make(map[peer.ID]struct{})
f := func(ctx context.Context, p peer.ID) bool {
cpl := CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p))
if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p {
// 1 is already in the RT, but 3 & 5 are candidates
if cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p {
addedCandidatesLk.Lock()
addedCandidates[p] = struct{}{}
addedCandidatesLk.Unlock()
}
return true
} else {
return false
}
}
// create RT with a very short cleanup interval
rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f),
TableCleanupInterval(100*time.Millisecond))
require.NoError(t, err)
// for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates
for _, peers := range cplPeerMap {
for _, p := range peers {
rt.HandlePeerAlive(p)
}
}
// validate current state
rt.tabLock.RLock()
require.Len(t, rt.ListPeers(), 6)
ps0 := rt.buckets[0].peerIds()
require.Len(t, ps0, 3)
ps1 := rt.buckets[1].peerIds()
require.Len(t, ps1, 3)
require.Contains(t, ps0, cplPeerMap[0][0])
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps0, cplPeerMap[0][2])
require.Contains(t, ps1, cplPeerMap[1][0])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][2])
rt.tabLock.RUnlock()
// now disconnect peers 0 1 & 2 from both buckets so it has only 0 left after it gets validated
for _, peers := range cplPeerMap {
rt.HandlePeerDisconnect(peers[0])
rt.HandlePeerDisconnect(peers[1])
rt.HandlePeerDisconnect(peers[2])
}
// let RT cleanup complete
time.Sleep(1 * time.Second)
// verify RT state
rt.tabLock.RLock()
require.Len(t, rt.ListPeers(), 2)
ps0 = rt.buckets[0].peerIds()
require.Len(t, ps0, 1)
ps1 = rt.buckets[1].peerIds()
require.Len(t, ps1, 1)
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps1, cplPeerMap[1][1])
rt.tabLock.RUnlock()
// verify candidate state
addedCandidatesLk.Lock()
require.Len(t, addedCandidates, 4)
require.Contains(t, addedCandidates, cplPeerMap[0][3])
require.Contains(t, addedCandidates, cplPeerMap[0][5])
require.Contains(t, addedCandidates, cplPeerMap[1][3])
require.Contains(t, addedCandidates, cplPeerMap[1][5])
addedCandidatesLk.Unlock()
// close RT
require.NoError(t, rt.Close())
}
func BenchmarkHandlePeerAlive(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment